Files
nomad/client/hostvolumemanager/host_volume_plugin.go
James Rasell e158356dd2 client: Remove created directory when mkdir plugin fails to chown. (#26194)
The mkdir plugin creates the directory and then chowns it. In the
event the chown command fails, we should attempt to remove the
directory. Without this, we leave directories on the client in
partial failure situations.
2025-07-04 08:36:36 +01:00

456 lines
15 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package hostvolumemanager
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strconv"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-version"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
)
const (
// environment variables for external plugins
EnvOperation = "DHV_OPERATION"
EnvVolumesDir = "DHV_VOLUMES_DIR"
EnvPluginDir = "DHV_PLUGIN_DIR"
EnvCreatedPath = "DHV_CREATED_PATH"
EnvNamespace = "DHV_NAMESPACE"
EnvVolumeName = "DHV_VOLUME_NAME"
EnvVolumeID = "DHV_VOLUME_ID"
EnvNodeID = "DHV_NODE_ID"
EnvNodePool = "DHV_NODE_POOL"
EnvCapacityMin = "DHV_CAPACITY_MIN_BYTES"
EnvCapacityMax = "DHV_CAPACITY_MAX_BYTES"
EnvParameters = "DHV_PARAMETERS"
)
// HostVolumePlugin manages the lifecycle of volumes.
type HostVolumePlugin interface {
Fingerprint(ctx context.Context) (*PluginFingerprint, error)
Create(ctx context.Context, req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error)
Delete(ctx context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error
}
// PluginFingerprint gets set on the node for volume scheduling.
// Plugins are expected to respond to 'fingerprint' calls with json that
// unmarshals to this struct.
type PluginFingerprint struct {
Version *version.Version `json:"version"`
}
// HostVolumePluginCreateResponse returns values to the server that may be shown
// to the user or stored in server state. Plugins are expected to respond to
// 'create' calls with json that unmarshals to this struct.
type HostVolumePluginCreateResponse struct {
Path string `json:"path"`
SizeBytes int64 `json:"bytes"`
Error string `json:"error"`
}
// HostVolumePluginDeleteResponse returns values to the server that may be shown
// to the user or stored in server state. Plugins are expected to respond to
// 'delete' calls with json that unmarshals to this struct.
type HostVolumePluginDeleteResponse struct {
Error string `json:"error"`
}
const HostVolumePluginMkdirID = "mkdir"
const HostVolumePluginMkdirVersion = "0.0.1"
// HostVolumePluginMkdirParams represents the parameters{} that the "mkdir"
// plugin will accept.
type HostVolumePluginMkdirParams struct {
Uid int
Gid int
Mode os.FileMode
}
var _ HostVolumePlugin = &HostVolumePluginMkdir{}
// HostVolumePluginMkdir is a plugin that creates a directory within the
// specified VolumesDir. It is built-in to Nomad, so is always available.
type HostVolumePluginMkdir struct {
ID string
VolumesDir string
log hclog.Logger
}
func (p *HostVolumePluginMkdir) Fingerprint(_ context.Context) (*PluginFingerprint, error) {
v, err := version.NewVersion(HostVolumePluginMkdirVersion)
return &PluginFingerprint{
Version: v,
}, err
}
func (p *HostVolumePluginMkdir) Create(_ context.Context,
req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) {
path := filepath.Join(p.VolumesDir, req.ID)
log := p.log.With(
"operation", "create",
"volume_id", req.ID,
"path", path)
log.Debug("running plugin")
resp := &HostVolumePluginCreateResponse{
Path: path,
// "mkdir" volumes, being simple directories, have unrestricted size
SizeBytes: 0,
}
if _, err := os.Stat(path); err == nil {
// already exists
return resp, nil
} else if !os.IsNotExist(err) {
// doesn't exist, but some other path error
log.Error("error with path", "error", err)
return nil, err
}
params, err := decodeMkdirParams(req.Parameters)
if err != nil {
log.Error("error with parameters", "error", err)
return nil, err
}
err = os.MkdirAll(path, params.Mode)
if err != nil {
log.Error("error creating directory", "error", err)
return nil, fmt.Errorf("error creating directory: %w", err)
}
// Chown note: A uid or gid of -1 means to not change that value.
if err = os.Chown(path, params.Uid, params.Gid); err != nil {
log.Error("error changing owner/group", "error", err, "uid", params.Uid, "gid", params.Gid)
// Failing to change ownership is fatal for this plugin. Since we have
// already created the directory, we should attempt to clean it.
// Otherwise, the operator must do this manually.
if err := os.RemoveAll(path); err != nil {
log.Error("failed to remove directory after create failure",
"error", err)
}
return nil, fmt.Errorf("error changing owner/group: %w", err)
}
log.Debug("plugin ran successfully")
return resp, nil
}
func decodeMkdirParams(in map[string]string) (HostVolumePluginMkdirParams, error) {
// default values if their associated keys are not in the input map
out := HostVolumePluginMkdirParams{
Mode: 0o700, // "0700"
Uid: -1, // this default translates to "do not set" in os.Chown
Gid: -1, // ditto
}
var err error
for param, val := range in {
switch param {
case "mode":
// mode needs special treatment - it's octal. note that this does
// not check whether it's a *reasonable* mode for a directory.
// that will be discovered during MkdirAll and subsequent usage
// by workloads (which we cannot predict).
var number uint64
number, err = strconv.ParseUint(val, 8, 32)
out.Mode = os.FileMode(number)
case "uid":
out.Uid, err = strconv.Atoi(val)
case "gid":
out.Gid, err = strconv.Atoi(val)
default:
err = fmt.Errorf("unknown mkdir parameter: %q", param)
}
if err != nil {
return out, fmt.Errorf("invalid value for %q: %w", param, err)
}
}
return out, nil
}
func (p *HostVolumePluginMkdir) Delete(_ context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error {
path := filepath.Join(p.VolumesDir, req.ID)
log := p.log.With(
"operation", "delete",
"volume_id", req.ID,
"path", path)
log.Debug("running plugin")
err := os.RemoveAll(path)
if err != nil {
log.Error("error with plugin", "error", err)
return err
}
log.Debug("plugin ran successfully")
return nil
}
var _ HostVolumePlugin = &HostVolumePluginExternal{}
// NewHostVolumePluginExternal returns an external host volume plugin
// if the specified executable exists on disk.
func NewHostVolumePluginExternal(log hclog.Logger,
pluginDir, filename, volumesDir, nodePool string) (*HostVolumePluginExternal, error) {
// this should only be called with already-detected executables,
// but we'll double-check it anyway, so we can provide a tidy error message
// if it has changed between fingerprinting and execution.
executable := filepath.Join(pluginDir, filename)
f, err := os.Stat(executable)
if err != nil {
if os.IsNotExist(err) {
return nil, fmt.Errorf("%w: %q", ErrPluginNotExists, filename)
}
return nil, err
}
if !helper.IsExecutable(f) {
return nil, fmt.Errorf("%w: %q", ErrPluginNotExecutable, filename)
}
return &HostVolumePluginExternal{
ID: filename,
Executable: executable,
VolumesDir: volumesDir,
PluginDir: pluginDir,
NodePool: nodePool,
log: log,
}, nil
}
// HostVolumePluginExternal calls an executable on disk. All operations
// *must* be idempotent, and safe to be called concurrently per volume.
// For each call, the executable's stdout and stderr may be logged, so plugin
// authors should not include any sensitive information in their plugin outputs.
type HostVolumePluginExternal struct {
ID string
Executable string
VolumesDir string
PluginDir string
NodePool string
log hclog.Logger
}
// Fingerprint calls the executable with the following parameters:
// arguments: $1=fingerprint
// environment:
// - DHV_OPERATION=fingerprint
//
// Response should be valid JSON on stdout, with a "version" key, e.g.:
// {"version": "0.0.1"}
// The version value should be a valid version number as allowed by
// version.NewVersion()
//
// Must complete within 5 seconds
func (p *HostVolumePluginExternal) Fingerprint(ctx context.Context) (*PluginFingerprint, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, p.Executable, "fingerprint")
cmd.Env = []string{EnvOperation + "=fingerprint"}
stdout, stderr, err := runCommand(cmd)
log := p.log.With(
"operation", "fingerprint",
"stdout", string(stdout),
"stderr", string(stderr),
)
if err != nil {
log.Error("error running plugin", "error", err)
return nil, fmt.Errorf("error fingerprinting plugin %q: %w", p.ID, err)
}
fprint := &PluginFingerprint{}
if err := json.Unmarshal(stdout, fprint); err != nil {
err = fmt.Errorf("error parsing fingerprint output as json: %w", err)
log.Error("error parsing plugin output", "error", err)
return nil, err
}
return fprint, nil
}
// Create calls the executable with the following parameters:
// arguments: $1=create
// environment:
// - DHV_OPERATION=create
// - DHV_VOLUMES_DIR={directory to put the volume in}
// - DHV_PLUGIN_DIR={path to directory containing plugins}
// - DHV_NAMESPACE={volume namespace}
// - DHV_VOLUME_NAME={name from the volume specification}
// - DHV_VOLUME_ID={volume ID generated by Nomad}
// - DHV_NODE_ID={Nomad node ID}
// - DHV_NODE_POOL={Nomad node pool}
// - DHV_CAPACITY_MIN_BYTES={capacity_min from the volume spec, expressed in bytes}
// - DHV_CAPACITY_MAX_BYTES={capacity_max from the volume spec, expressed in bytes}
// - DHV_PARAMETERS={stringified json of parameters from the volume spec}
//
// Response should be valid JSON on stdout with "path" and "bytes", e.g.:
// {"path": "/path/that/was/created", "bytes": 50000000}
// "path" must be provided to confirm the requested path is what was
// created by the plugin. "bytes" is the actual size of the volume created
// by the plugin; if excluded, it will default to 0.
//
// Must complete within 60 seconds (timeout on RPC)
func (p *HostVolumePluginExternal) Create(ctx context.Context,
req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) {
params, err := json.Marshal(req.Parameters)
if err != nil {
// should never happen; req.Parameters is a simple map[string]string
return nil, fmt.Errorf("error marshaling volume pramaters: %w", err)
}
envVars := []string{
fmt.Sprintf("%s=%s", EnvOperation, "create"),
fmt.Sprintf("%s=%s", EnvVolumesDir, p.VolumesDir),
fmt.Sprintf("%s=%s", EnvPluginDir, p.PluginDir),
fmt.Sprintf("%s=%s", EnvNodePool, p.NodePool),
// values from volume spec
fmt.Sprintf("%s=%s", EnvNamespace, req.Namespace),
fmt.Sprintf("%s=%s", EnvVolumeName, req.Name),
fmt.Sprintf("%s=%s", EnvVolumeID, req.ID),
fmt.Sprintf("%s=%d", EnvCapacityMin, req.RequestedCapacityMinBytes),
fmt.Sprintf("%s=%d", EnvCapacityMax, req.RequestedCapacityMaxBytes),
fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID),
fmt.Sprintf("%s=%s", EnvParameters, params),
}
var pluginResp HostVolumePluginCreateResponse
log := p.log.With("volume_name", req.Name, "volume_id", req.ID)
stdout, _, err := p.runPlugin(ctx, log, "create", envVars)
if err != nil {
jsonErr := json.Unmarshal(stdout, &pluginResp)
if jsonErr != nil {
// if we got an error, we can't actually count on getting JSON, so
// optimistically look for it and return the original error
// otherwise
return nil, fmt.Errorf(
"error creating volume %q with plugin %q: %w", req.ID, p.ID, err)
}
return nil, fmt.Errorf("error creating volume %q with plugin %q: %w: %s",
req.ID, p.ID, err, pluginResp.Error)
}
err = json.Unmarshal(stdout, &pluginResp)
if err != nil {
// note: if a plugin does not return valid json, a volume may be
// created without any respective state in Nomad, since we return
// an error here after the plugin has done who-knows-what.
return nil, err
}
return &pluginResp, nil
}
// Delete calls the executable with the following parameters:
// arguments: $1=delete
// environment:
// - DHV_OPERATION=delete
// - DHV_CREATED_PATH={path that `create` returned}
// - DHV_VOLUMES_DIR={directory that volumes should be put in}
// - DHV_PLUGIN_DIR={path to directory containing plugins}
// - DHV_NAMESPACE={volume namespace}
// - DHV_VOLUME_NAME={name from the volume specification}
// - DHV_VOLUME_ID={volume ID generated by Nomad}
// - DHV_NODE_ID={Nomad node ID}
// - DHV_NODE_POOL={Nomad node pool}
// - DHV_PARAMETERS={stringified json of parameters from the volume spec}
//
// Response on stdout is discarded.
//
// Must complete within 60 seconds (timeout on RPC)
func (p *HostVolumePluginExternal) Delete(ctx context.Context,
req *cstructs.ClientHostVolumeDeleteRequest) error {
params, err := json.Marshal(req.Parameters)
if err != nil {
// should never happen; req.Parameters is a simple map[string]string
return fmt.Errorf("error marshaling volume pramaters: %w", err)
}
envVars := []string{
fmt.Sprintf("%s=%s", EnvOperation, "delete"),
fmt.Sprintf("%s=%s", EnvVolumesDir, p.VolumesDir),
fmt.Sprintf("%s=%s", EnvPluginDir, p.PluginDir),
fmt.Sprintf("%s=%s", EnvNodePool, p.NodePool),
// from create response
fmt.Sprintf("%s=%s", EnvCreatedPath, req.HostPath),
// values from volume spec
fmt.Sprintf("%s=%s", EnvNamespace, req.Namespace),
fmt.Sprintf("%s=%s", EnvVolumeName, req.Name),
fmt.Sprintf("%s=%s", EnvVolumeID, req.ID),
fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID),
fmt.Sprintf("%s=%s", EnvParameters, params),
}
log := p.log.With("volume_name", req.Name, "volume_id", req.ID)
stdout, _, err := p.runPlugin(ctx, log, "delete", envVars)
if err != nil {
var pluginResp HostVolumePluginDeleteResponse
jsonErr := json.Unmarshal(stdout, &pluginResp)
if jsonErr != nil {
// if we got an error, we can't actually count on getting JSON, so
// optimistically look for it and return the original error
// otherwise
return fmt.Errorf("error reading plugin response when deleting volume %q with plugin %q: original error: %w", req.ID, p.ID, err)
}
return fmt.Errorf("error deleting volume %q with plugin %q: %w: %s",
req.ID, p.ID, err, pluginResp.Error)
}
return nil
}
// runPlugin executes the... executable
func (p *HostVolumePluginExternal) runPlugin(ctx context.Context, log hclog.Logger,
op string, env []string) (stdout, stderr []byte, err error) {
log = log.With("operation", op)
log.Debug("running plugin")
// set up plugin execution
cmd := exec.CommandContext(ctx, p.Executable, op)
cmd.Env = env
stdout, stderr, err = runCommand(cmd)
log = log.With(
"stdout", string(stdout),
"stderr", string(stderr),
)
if err != nil {
log.Error("error with plugin", "error", err)
return stdout, stderr, err
}
log.Debug("plugin ran successfully")
return stdout, stderr, nil
}
// runCommand executes the provided Cmd and captures stdout and stderr.
func runCommand(cmd *exec.Cmd) (stdout, stderr []byte, err error) {
var errBuf bytes.Buffer
cmd.Stderr = io.Writer(&errBuf)
mErr := &multierror.Error{}
stdout, err = cmd.Output()
if err != nil {
mErr = multierror.Append(mErr, err)
}
stderr, err = io.ReadAll(&errBuf)
if err != nil {
mErr = multierror.Append(mErr, err)
}
return stdout, stderr, helper.FlattenMultierror(mErr.ErrorOrNil())
}