Files
nomad/command/agent/csi_endpoint.go
Tim Gross a65358da7b dynamic host volumes: HTTP API endpoint (#24380)
This changeset implements the HTTP API endpoints for Dynamic Host Volumes.

The `GET /v1/volumes` endpoint is shared between CSI and DHV with a query
parameter for the type. In the interest of getting some working handlers
available for use in development (and minimizing the size of the diff to
review), this changeset doesn't do any sort of refactoring of how the existing
List Volumes CSI endpoint works. That will come in a later PR, as will the
corresponding `api` package updates we need to support the CLI.

Ref: https://hashicorp.atlassian.net/browse/NET-11549
2024-12-19 09:25:54 -05:00

430 lines
11 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package agent
import (
"net/http"
"strconv"
"strings"
"github.com/hashicorp/nomad/nomad/structs"
)
func (s *HTTPServer) CSIVolumesRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
switch req.Method {
case http.MethodPut, http.MethodPost:
return s.csiVolumeRegister(resp, req)
case http.MethodGet:
default:
return nil, CodedError(405, ErrInvalidMethod)
}
// Type filters volume lists to a specific type. When support for non-CSI volumes is
// introduced, we'll need to dispatch here
query := req.URL.Query()
qtype, ok := query["type"]
if !ok {
return []*structs.CSIVolListStub{}, nil
}
// TODO(1.10.0): move handling of GET /v1/volumes/ out so that we're not
// co-mingling the call for listing host volume here
switch qtype[0] {
case "host":
return s.HostVolumesListRequest(resp, req)
case "csi":
default:
return nil, nil
}
args := structs.CSIVolumeListRequest{}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
args.Prefix = query.Get("prefix")
args.PluginID = query.Get("plugin_id")
args.NodeID = query.Get("node_id")
var out structs.CSIVolumeListResponse
if err := s.agent.RPC("CSIVolume.List", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
return out.Volumes, nil
}
func (s *HTTPServer) CSIExternalVolumesRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != http.MethodGet {
return nil, CodedError(405, ErrInvalidMethod)
}
args := structs.CSIVolumeExternalListRequest{}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
query := req.URL.Query()
args.PluginID = query.Get("plugin_id")
var out structs.CSIVolumeExternalListResponse
if err := s.agent.RPC("CSIVolume.ListExternal", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
return out, nil
}
// CSIVolumeSpecificRequest dispatches GET and PUT
func (s *HTTPServer) CSIVolumeSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Tokenize the suffix of the path to get the volume id
reqSuffix := strings.TrimPrefix(req.URL.Path, "/v1/volume/csi/")
tokens := strings.Split(reqSuffix, "/")
if len(tokens) < 1 {
return nil, CodedError(404, resourceNotFoundErr)
}
id := tokens[0]
if len(tokens) == 1 {
switch req.Method {
case http.MethodGet:
return s.csiVolumeGet(id, resp, req)
case http.MethodPut:
return s.csiVolumeRegister(resp, req)
case http.MethodDelete:
return s.csiVolumeDeregister(id, resp, req)
default:
return nil, CodedError(405, ErrInvalidMethod)
}
}
if len(tokens) == 2 {
switch req.Method {
case http.MethodPut:
if tokens[1] == "create" {
return s.csiVolumeCreate(resp, req)
}
case http.MethodDelete:
if tokens[1] == "detach" {
return s.csiVolumeDetach(id, resp, req)
}
if tokens[1] == "delete" {
return s.csiVolumeDelete(id, resp, req)
}
default:
return nil, CodedError(405, ErrInvalidMethod)
}
}
return nil, CodedError(404, resourceNotFoundErr)
}
func (s *HTTPServer) csiVolumeGet(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.CSIVolumeGetRequest{
ID: id,
}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
var out structs.CSIVolumeGetResponse
if err := s.agent.RPC("CSIVolume.Get", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
if out.Volume == nil {
return nil, CodedError(404, "volume not found")
}
return out.Volume, nil
}
func (s *HTTPServer) csiVolumeRegister(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
switch req.Method {
case http.MethodPost, http.MethodPut:
default:
return nil, CodedError(405, ErrInvalidMethod)
}
args := structs.CSIVolumeRegisterRequest{}
if err := decodeBody(req, &args); err != nil {
return err, CodedError(400, err.Error())
}
s.parseWriteRequest(req, &args.WriteRequest)
var out structs.CSIVolumeRegisterResponse
if err := s.agent.RPC("CSIVolume.Register", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
return nil, nil
}
func (s *HTTPServer) csiVolumeCreate(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
switch req.Method {
case http.MethodPost, http.MethodPut:
default:
return nil, CodedError(405, ErrInvalidMethod)
}
args := structs.CSIVolumeCreateRequest{}
if err := decodeBody(req, &args); err != nil {
return err, CodedError(400, err.Error())
}
s.parseWriteRequest(req, &args.WriteRequest)
var out structs.CSIVolumeCreateResponse
if err := s.agent.RPC("CSIVolume.Create", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
return out, nil
}
func (s *HTTPServer) csiVolumeDeregister(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != http.MethodDelete {
return nil, CodedError(405, ErrInvalidMethod)
}
raw := req.URL.Query().Get("force")
var force bool
if raw != "" {
var err error
force, err = strconv.ParseBool(raw)
if err != nil {
return nil, CodedError(400, "invalid force value")
}
}
args := structs.CSIVolumeDeregisterRequest{
VolumeIDs: []string{id},
Force: force,
}
s.parseWriteRequest(req, &args.WriteRequest)
var out structs.CSIVolumeDeregisterResponse
if err := s.agent.RPC("CSIVolume.Deregister", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
return nil, nil
}
func (s *HTTPServer) csiVolumeDelete(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != http.MethodDelete {
return nil, CodedError(405, ErrInvalidMethod)
}
secrets := parseCSISecrets(req)
args := structs.CSIVolumeDeleteRequest{
VolumeIDs: []string{id},
Secrets: secrets,
}
s.parseWriteRequest(req, &args.WriteRequest)
var out structs.CSIVolumeDeleteResponse
if err := s.agent.RPC("CSIVolume.Delete", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
return nil, nil
}
func (s *HTTPServer) csiVolumeDetach(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != http.MethodDelete {
return nil, CodedError(405, ErrInvalidMethod)
}
nodeID := req.URL.Query().Get("node")
if nodeID == "" {
return nil, CodedError(400, "detach requires node ID")
}
args := structs.CSIVolumeUnpublishRequest{
VolumeID: id,
Claim: &structs.CSIVolumeClaim{
NodeID: nodeID,
Mode: structs.CSIVolumeClaimGC,
},
}
s.parseWriteRequest(req, &args.WriteRequest)
var out structs.CSIVolumeUnpublishResponse
if err := s.agent.RPC("CSIVolume.Unpublish", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
return nil, nil
}
func (s *HTTPServer) CSISnapshotsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
switch req.Method {
case http.MethodPut, http.MethodPost:
return s.csiSnapshotCreate(resp, req)
case http.MethodDelete:
return s.csiSnapshotDelete(resp, req)
case http.MethodGet:
return s.csiSnapshotList(resp, req)
}
return nil, CodedError(405, ErrInvalidMethod)
}
func (s *HTTPServer) csiSnapshotCreate(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.CSISnapshotCreateRequest{}
if err := decodeBody(req, &args); err != nil {
return err, CodedError(400, err.Error())
}
s.parseWriteRequest(req, &args.WriteRequest)
var out structs.CSISnapshotCreateResponse
if err := s.agent.RPC("CSIVolume.CreateSnapshot", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
return out, nil
}
func (s *HTTPServer) csiSnapshotDelete(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.CSISnapshotDeleteRequest{}
s.parseWriteRequest(req, &args.WriteRequest)
snap := &structs.CSISnapshot{Secrets: structs.CSISecrets{}}
query := req.URL.Query()
snap.PluginID = query.Get("plugin_id")
snap.ID = query.Get("snapshot_id")
secrets := parseCSISecrets(req)
snap.Secrets = secrets
args.Snapshots = []*structs.CSISnapshot{snap}
var out structs.CSISnapshotDeleteResponse
if err := s.agent.RPC("CSIVolume.DeleteSnapshot", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
return nil, nil
}
func (s *HTTPServer) csiSnapshotList(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.CSISnapshotListRequest{}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
query := req.URL.Query()
args.PluginID = query.Get("plugin_id")
secrets := parseCSISecrets(req)
args.Secrets = secrets
var out structs.CSISnapshotListResponse
if err := s.agent.RPC("CSIVolume.ListSnapshots", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
return out, nil
}
// CSIPluginsRequest lists CSI plugins
func (s *HTTPServer) CSIPluginsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != http.MethodGet {
return nil, CodedError(405, ErrInvalidMethod)
}
// Type filters plugin lists to a specific type. When support for non-CSI plugins is
// introduced, we'll need to dispatch here
query := req.URL.Query()
qtype, ok := query["type"]
if !ok {
return []*structs.CSIPluginListStub{}, nil
}
if qtype[0] != "csi" {
return nil, nil
}
args := structs.CSIPluginListRequest{}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
var out structs.CSIPluginListResponse
if err := s.agent.RPC("CSIPlugin.List", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
return out.Plugins, nil
}
// CSIPluginSpecificRequest list the job with CSIInfo
func (s *HTTPServer) CSIPluginSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != http.MethodGet {
return nil, CodedError(405, ErrInvalidMethod)
}
// Tokenize the suffix of the path to get the plugin id
reqSuffix := strings.TrimPrefix(req.URL.Path, "/v1/plugin/csi/")
tokens := strings.Split(reqSuffix, "/")
if len(tokens) > 2 || len(tokens) < 1 {
return nil, CodedError(404, resourceNotFoundErr)
}
id := tokens[0]
args := structs.CSIPluginGetRequest{ID: id}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
var out structs.CSIPluginGetResponse
if err := s.agent.RPC("CSIPlugin.Get", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
if out.Plugin == nil {
return nil, CodedError(404, "plugin not found")
}
return out.Plugin, nil
}
// parseCSISecrets extracts a map of k/v pairs from the CSI secrets
// header. Silently ignores invalid secrets
func parseCSISecrets(req *http.Request) structs.CSISecrets {
secretsHeader := req.Header.Get("X-Nomad-CSI-Secrets")
if secretsHeader == "" {
return nil
}
secrets := map[string]string{}
secretkvs := strings.Split(secretsHeader, ",")
for _, secretkv := range secretkvs {
if key, value, found := strings.Cut(secretkv, "="); found {
secrets[key] = value
}
}
if len(secrets) == 0 {
return nil
}
return structs.CSISecrets(secrets)
}