mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 18:35:44 +03:00
Merge pull request #7012 from hashicorp/f-csi-volumes
Container Storage Interface Support
This commit is contained in:
@@ -6,11 +6,10 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/hcl"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/nomad/structs/config"
|
||||
)
|
||||
|
||||
@@ -110,49 +109,33 @@ func durations(xs []td) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// removeEqualFold removes the first string that EqualFold matches
|
||||
func removeEqualFold(xs *[]string, search string) {
|
||||
sl := *xs
|
||||
for i, x := range sl {
|
||||
if strings.EqualFold(x, search) {
|
||||
sl = append(sl[:i], sl[i+1:]...)
|
||||
if len(sl) == 0 {
|
||||
*xs = nil
|
||||
} else {
|
||||
*xs = sl
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func extraKeys(c *Config) error {
|
||||
// hcl leaves behind extra keys when parsing JSON. These keys
|
||||
// are kept on the top level, taken from slices or the keys of
|
||||
// structs contained in slices. Clean up before looking for
|
||||
// extra keys.
|
||||
for range c.HTTPAPIResponseHeaders {
|
||||
removeEqualFold(&c.ExtraKeysHCL, "http_api_response_headers")
|
||||
helper.RemoveEqualFold(&c.ExtraKeysHCL, "http_api_response_headers")
|
||||
}
|
||||
|
||||
for _, p := range c.Plugins {
|
||||
removeEqualFold(&c.ExtraKeysHCL, p.Name)
|
||||
removeEqualFold(&c.ExtraKeysHCL, "config")
|
||||
removeEqualFold(&c.ExtraKeysHCL, "plugin")
|
||||
helper.RemoveEqualFold(&c.ExtraKeysHCL, p.Name)
|
||||
helper.RemoveEqualFold(&c.ExtraKeysHCL, "config")
|
||||
helper.RemoveEqualFold(&c.ExtraKeysHCL, "plugin")
|
||||
}
|
||||
|
||||
for _, k := range []string{"options", "meta", "chroot_env", "servers", "server_join"} {
|
||||
removeEqualFold(&c.ExtraKeysHCL, k)
|
||||
removeEqualFold(&c.ExtraKeysHCL, "client")
|
||||
helper.RemoveEqualFold(&c.ExtraKeysHCL, k)
|
||||
helper.RemoveEqualFold(&c.ExtraKeysHCL, "client")
|
||||
}
|
||||
|
||||
// stats is an unused key, continue to silently ignore it
|
||||
removeEqualFold(&c.Client.ExtraKeysHCL, "stats")
|
||||
helper.RemoveEqualFold(&c.Client.ExtraKeysHCL, "stats")
|
||||
|
||||
// Remove HostVolume extra keys
|
||||
for _, hv := range c.Client.HostVolumes {
|
||||
removeEqualFold(&c.Client.ExtraKeysHCL, hv.Name)
|
||||
removeEqualFold(&c.Client.ExtraKeysHCL, "host_volume")
|
||||
helper.RemoveEqualFold(&c.Client.ExtraKeysHCL, hv.Name)
|
||||
helper.RemoveEqualFold(&c.Client.ExtraKeysHCL, "host_volume")
|
||||
}
|
||||
|
||||
// Remove AuditConfig extra keys
|
||||
@@ -167,60 +150,14 @@ func extraKeys(c *Config) error {
|
||||
}
|
||||
|
||||
for _, k := range []string{"enabled_schedulers", "start_join", "retry_join", "server_join"} {
|
||||
removeEqualFold(&c.ExtraKeysHCL, k)
|
||||
removeEqualFold(&c.ExtraKeysHCL, "server")
|
||||
helper.RemoveEqualFold(&c.ExtraKeysHCL, k)
|
||||
helper.RemoveEqualFold(&c.ExtraKeysHCL, "server")
|
||||
}
|
||||
|
||||
for _, k := range []string{"datadog_tags"} {
|
||||
removeEqualFold(&c.ExtraKeysHCL, k)
|
||||
removeEqualFold(&c.ExtraKeysHCL, "telemetry")
|
||||
helper.RemoveEqualFold(&c.ExtraKeysHCL, k)
|
||||
helper.RemoveEqualFold(&c.ExtraKeysHCL, "telemetry")
|
||||
}
|
||||
|
||||
return extraKeysImpl([]string{}, reflect.ValueOf(*c))
|
||||
}
|
||||
|
||||
// extraKeysImpl returns an error if any extraKeys array is not empty
|
||||
func extraKeysImpl(path []string, val reflect.Value) error {
|
||||
stype := val.Type()
|
||||
for i := 0; i < stype.NumField(); i++ {
|
||||
ftype := stype.Field(i)
|
||||
fval := val.Field(i)
|
||||
|
||||
name := ftype.Name
|
||||
prop := ""
|
||||
tagSplit(ftype, "hcl", &name, &prop)
|
||||
|
||||
if fval.Kind() == reflect.Ptr {
|
||||
fval = reflect.Indirect(fval)
|
||||
}
|
||||
|
||||
// struct? recurse. add the struct's key to the path
|
||||
if fval.Kind() == reflect.Struct {
|
||||
err := extraKeysImpl(append([]string{name}, path...), fval)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if "unusedKeys" == prop {
|
||||
if ks, ok := fval.Interface().([]string); ok && len(ks) != 0 {
|
||||
return fmt.Errorf("%s unexpected keys %s",
|
||||
strings.Join(path, "."),
|
||||
strings.Join(ks, ", "))
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// tagSplit reads the named tag from the structfield and splits its values into strings
|
||||
func tagSplit(field reflect.StructField, tagName string, vars ...*string) {
|
||||
tag := strings.Split(field.Tag.Get(tagName), ",")
|
||||
end := len(tag) - 1
|
||||
for i, s := range vars {
|
||||
if i > end {
|
||||
return
|
||||
}
|
||||
*s = tag[i]
|
||||
}
|
||||
return helper.UnusedKeys(c)
|
||||
}
|
||||
|
||||
200
command/agent/csi_endpoint.go
Normal file
200
command/agent/csi_endpoint.go
Normal file
@@ -0,0 +1,200 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
const errRequiresType = "Missing required parameter type"
|
||||
|
||||
func (s *HTTPServer) CSIVolumesRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
if req.Method != "GET" {
|
||||
return nil, CodedError(405, ErrInvalidMethod)
|
||||
}
|
||||
|
||||
// Type filters volume lists to a specific type. When support for non-CSI volumes is
|
||||
// introduced, we'll need to dispatch here
|
||||
query := req.URL.Query()
|
||||
qtype, ok := query["type"]
|
||||
if !ok {
|
||||
return nil, CodedError(400, errRequiresType)
|
||||
}
|
||||
if qtype[0] != "csi" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
args := structs.CSIVolumeListRequest{}
|
||||
|
||||
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if plugin, ok := query["plugin_id"]; ok {
|
||||
args.PluginID = plugin[0]
|
||||
}
|
||||
if node, ok := query["node_id"]; ok {
|
||||
args.NodeID = node[0]
|
||||
}
|
||||
|
||||
var out structs.CSIVolumeListResponse
|
||||
if err := s.agent.RPC("CSIVolume.List", &args, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
setMeta(resp, &out.QueryMeta)
|
||||
return out.Volumes, nil
|
||||
}
|
||||
|
||||
// CSIVolumeSpecificRequest dispatches GET and PUT
|
||||
func (s *HTTPServer) CSIVolumeSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
// Tokenize the suffix of the path to get the volume id
|
||||
reqSuffix := strings.TrimPrefix(req.URL.Path, "/v1/volume/csi/")
|
||||
tokens := strings.Split(reqSuffix, "/")
|
||||
if len(tokens) > 2 || len(tokens) < 1 {
|
||||
return nil, CodedError(404, resourceNotFoundErr)
|
||||
}
|
||||
id := tokens[0]
|
||||
|
||||
switch req.Method {
|
||||
case "GET":
|
||||
return s.csiVolumeGet(id, resp, req)
|
||||
case "PUT":
|
||||
return s.csiVolumePut(id, resp, req)
|
||||
case "DELETE":
|
||||
return s.csiVolumeDelete(id, resp, req)
|
||||
default:
|
||||
return nil, CodedError(405, ErrInvalidMethod)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *HTTPServer) csiVolumeGet(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
args := structs.CSIVolumeGetRequest{
|
||||
ID: id,
|
||||
}
|
||||
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var out structs.CSIVolumeGetResponse
|
||||
if err := s.agent.RPC("CSIVolume.Get", &args, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
setMeta(resp, &out.QueryMeta)
|
||||
if out.Volume == nil {
|
||||
return nil, CodedError(404, "volume not found")
|
||||
}
|
||||
|
||||
return out.Volume, nil
|
||||
}
|
||||
|
||||
func (s *HTTPServer) csiVolumePut(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
if req.Method != "PUT" {
|
||||
return nil, CodedError(405, ErrInvalidMethod)
|
||||
}
|
||||
|
||||
args0 := structs.CSIVolumeRegisterRequest{}
|
||||
if err := decodeBody(req, &args0); err != nil {
|
||||
return err, CodedError(400, err.Error())
|
||||
}
|
||||
|
||||
args := structs.CSIVolumeRegisterRequest{
|
||||
Volumes: args0.Volumes,
|
||||
}
|
||||
s.parseWriteRequest(req, &args.WriteRequest)
|
||||
|
||||
var out structs.CSIVolumeRegisterResponse
|
||||
if err := s.agent.RPC("CSIVolume.Register", &args, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
setMeta(resp, &out.QueryMeta)
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *HTTPServer) csiVolumeDelete(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
if req.Method != "DELETE" {
|
||||
return nil, CodedError(405, ErrInvalidMethod)
|
||||
}
|
||||
|
||||
args := structs.CSIVolumeDeregisterRequest{
|
||||
VolumeIDs: []string{id},
|
||||
}
|
||||
s.parseWriteRequest(req, &args.WriteRequest)
|
||||
|
||||
var out structs.CSIVolumeDeregisterResponse
|
||||
if err := s.agent.RPC("CSIVolume.Deregister", &args, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
setMeta(resp, &out.QueryMeta)
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// CSIPluginsRequest lists CSI plugins
|
||||
func (s *HTTPServer) CSIPluginsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
if req.Method != "GET" {
|
||||
return nil, CodedError(405, ErrInvalidMethod)
|
||||
}
|
||||
|
||||
// Type filters plugin lists to a specific type. When support for non-CSI plugins is
|
||||
// introduced, we'll need to dispatch here
|
||||
query := req.URL.Query()
|
||||
qtype, ok := query["type"]
|
||||
if !ok {
|
||||
return nil, CodedError(400, errRequiresType)
|
||||
}
|
||||
if qtype[0] != "csi" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
args := structs.CSIPluginListRequest{}
|
||||
|
||||
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var out structs.CSIPluginListResponse
|
||||
if err := s.agent.RPC("CSIPlugin.List", &args, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
setMeta(resp, &out.QueryMeta)
|
||||
return out.Plugins, nil
|
||||
}
|
||||
|
||||
// CSIPluginSpecificRequest list the job with CSIInfo
|
||||
func (s *HTTPServer) CSIPluginSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
if req.Method != "GET" {
|
||||
return nil, CodedError(405, ErrInvalidMethod)
|
||||
}
|
||||
|
||||
// Tokenize the suffix of the path to get the plugin id
|
||||
reqSuffix := strings.TrimPrefix(req.URL.Path, "/v1/plugin/csi/")
|
||||
tokens := strings.Split(reqSuffix, "/")
|
||||
if len(tokens) > 2 || len(tokens) < 1 {
|
||||
return nil, CodedError(404, resourceNotFoundErr)
|
||||
}
|
||||
id := tokens[0]
|
||||
|
||||
args := structs.CSIPluginGetRequest{ID: id}
|
||||
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var out structs.CSIPluginGetResponse
|
||||
if err := s.agent.RPC("CSIPlugin.Get", &args, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
setMeta(resp, &out.QueryMeta)
|
||||
if out.Plugin == nil {
|
||||
return nil, CodedError(404, "plugin not found")
|
||||
}
|
||||
|
||||
return out.Plugin, nil
|
||||
}
|
||||
@@ -263,6 +263,11 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
|
||||
s.mux.HandleFunc("/v1/deployments", s.wrap(s.DeploymentsRequest))
|
||||
s.mux.HandleFunc("/v1/deployment/", s.wrap(s.DeploymentSpecificRequest))
|
||||
|
||||
s.mux.HandleFunc("/v1/volumes", s.wrap(s.CSIVolumesRequest))
|
||||
s.mux.HandleFunc("/v1/volume/csi/", s.wrap(s.CSIVolumeSpecificRequest))
|
||||
s.mux.HandleFunc("/v1/plugins", s.wrap(s.CSIPluginsRequest))
|
||||
s.mux.HandleFunc("/v1/plugin/csi/", s.wrap(s.CSIPluginSpecificRequest))
|
||||
|
||||
s.mux.HandleFunc("/v1/acl/policies", s.wrap(s.ACLPoliciesRequest))
|
||||
s.mux.HandleFunc("/v1/acl/policy/", s.wrap(s.ACLPolicySpecificRequest))
|
||||
|
||||
|
||||
@@ -749,8 +749,9 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
|
||||
if l := len(taskGroup.Volumes); l != 0 {
|
||||
tg.Volumes = make(map[string]*structs.VolumeRequest, l)
|
||||
for k, v := range taskGroup.Volumes {
|
||||
if v.Type != structs.VolumeTypeHost {
|
||||
// Ignore non-host volumes in this iteration currently.
|
||||
if v.Type != structs.VolumeTypeHost && v.Type != structs.VolumeTypeCSI {
|
||||
// Ignore volumes we don't understand in this iteration currently.
|
||||
// - This is because we don't currently have a way to return errors here.
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -761,6 +762,13 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
|
||||
Source: v.Source,
|
||||
}
|
||||
|
||||
if v.MountOptions != nil {
|
||||
vol.MountOptions = &structs.CSIMountOptions{
|
||||
FSType: v.MountOptions.FSType,
|
||||
MountFlags: v.MountOptions.MountFlags,
|
||||
}
|
||||
}
|
||||
|
||||
tg.Volumes[k] = vol
|
||||
}
|
||||
}
|
||||
@@ -812,6 +820,7 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) {
|
||||
structsTask.Kind = structs.TaskKind(apiTask.Kind)
|
||||
structsTask.Constraints = ApiConstraintsToStructs(apiTask.Constraints)
|
||||
structsTask.Affinities = ApiAffinitiesToStructs(apiTask.Affinities)
|
||||
structsTask.CSIPluginConfig = ApiCSIPluginConfigToStructsCSIPluginConfig(apiTask.CSIPluginConfig)
|
||||
|
||||
if l := len(apiTask.VolumeMounts); l != 0 {
|
||||
structsTask.VolumeMounts = make([]*structs.VolumeMount, l)
|
||||
@@ -933,6 +942,18 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) {
|
||||
}
|
||||
}
|
||||
|
||||
func ApiCSIPluginConfigToStructsCSIPluginConfig(apiConfig *api.TaskCSIPluginConfig) *structs.TaskCSIPluginConfig {
|
||||
if apiConfig == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
sc := &structs.TaskCSIPluginConfig{}
|
||||
sc.ID = apiConfig.ID
|
||||
sc.Type = structs.CSIPluginType(apiConfig.Type)
|
||||
sc.MountDir = apiConfig.MountDir
|
||||
return sc
|
||||
}
|
||||
|
||||
func ApiResourcesToStructs(in *api.Resources) *structs.Resources {
|
||||
if in == nil {
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user