dynamic host volumes: fingerprint client plugins (#24589)

This commit is contained in:
Daniel Bennett
2024-12-02 16:27:10 -05:00
committed by Tim Gross
parent df258ac02a
commit 46a39560bb
13 changed files with 359 additions and 71 deletions

View File

@@ -34,7 +34,7 @@ import (
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/hoststats"
"github.com/hashicorp/nomad/client/hostvolumemanager"
hvm "github.com/hashicorp/nomad/client/hostvolumemanager"
cinterfaces "github.com/hashicorp/nomad/client/interfaces"
"github.com/hashicorp/nomad/client/lib/cgroupslib"
"github.com/hashicorp/nomad/client/lib/numalib"
@@ -290,7 +290,7 @@ type Client struct {
// drivermanager is responsible for managing driver plugins
drivermanager drivermanager.Manager
hostVolumeManager *hostvolumemanager.HostVolumeManager
hostVolumeManager *hvm.HostVolumeManager
// baseLabels are used when emitting tagged metrics. All client metrics will
// have these tags, and optionally more.
@@ -535,7 +535,9 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
c.devicemanager = devManager
c.pluginManagers.RegisterAndRun(devManager)
c.hostVolumeManager = hostvolumemanager.NewHostVolumeManager(cfg.AllocMountsDir, logger)
c.hostVolumeManager = hvm.NewHostVolumeManager(logger,
cfg.HostVolumePluginDir,
cfg.AllocMountsDir)
// Set up the service registration wrapper using the Consul and Nomad
// implementations. The Nomad implementation is only ever used on the

View File

@@ -314,6 +314,9 @@ type Config struct {
// HostVolumes is a map of the configured host volumes by name.
HostVolumes map[string]*structs.ClientHostVolumeConfig
// HostVolumePluginDir is the directory with dynamic host volume plugins.
HostVolumePluginDir string
// HostNetworks is a map of the conigured host networks by name.
HostNetworks map[string]*structs.ClientHostNetworkConfig

View File

@@ -0,0 +1,120 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package fingerprint
import (
"context"
"os"
"strings"
"sync"
"time"
"github.com/hashicorp/go-hclog"
hvm "github.com/hashicorp/nomad/client/hostvolumemanager"
"github.com/hashicorp/nomad/helper"
)
func NewPluginsHostVolumeFingerprint(logger hclog.Logger) Fingerprint {
return &DynamicHostVolumePluginFingerprint{
logger: logger.Named("host_volume_plugins"),
}
}
var _ ReloadableFingerprint = &DynamicHostVolumePluginFingerprint{}
type DynamicHostVolumePluginFingerprint struct {
logger hclog.Logger
}
func (h *DynamicHostVolumePluginFingerprint) Reload() {
// host volume plugins are re-detected on agent reload
}
func (h *DynamicHostVolumePluginFingerprint) Fingerprint(request *FingerprintRequest, response *FingerprintResponse) error {
// always add "mkdir" plugin
h.logger.Debug("detected plugin built-in",
"plugin_id", hvm.HostVolumePluginMkdirID, "version", hvm.HostVolumePluginMkdirVersion)
defer response.AddAttribute("plugins.host_volume.version."+hvm.HostVolumePluginMkdirID, hvm.HostVolumePluginMkdirVersion)
response.Detected = true
// this config value will be empty in -dev mode
pluginDir := request.Config.HostVolumePluginDir
if pluginDir == "" {
return nil
}
plugins, err := GetHostVolumePluginVersions(h.logger, pluginDir)
if err != nil {
if os.IsNotExist(err) {
h.logger.Debug("plugin dir does not exist", "dir", pluginDir)
} else {
h.logger.Warn("error finding plugins", "dir", pluginDir, "error", err)
}
return nil // don't halt agent start
}
// if this was a reload, wipe what was there before
for k := range request.Node.Attributes {
if strings.HasPrefix(k, "plugins.host_volume.") {
response.RemoveAttribute(k)
}
}
// set the attribute(s)
for plugin, version := range plugins {
h.logger.Debug("detected plugin", "plugin_id", plugin, "version", version)
response.AddAttribute("plugins.host_volume.version."+plugin, version)
}
return nil
}
func (h *DynamicHostVolumePluginFingerprint) Periodic() (bool, time.Duration) {
return false, 0
}
// GetHostVolumePluginVersions finds all the executable files on disk
// that respond to a Version call (arg $1 = 'version' / env $OPERATION = 'version')
// The return map's keys are plugin IDs, and the values are version strings.
func GetHostVolumePluginVersions(log hclog.Logger, pluginDir string) (map[string]string, error) {
files, err := helper.FindExecutableFiles(pluginDir)
if err != nil {
return nil, err
}
plugins := make(map[string]string)
mut := sync.Mutex{}
var wg sync.WaitGroup
for file, fullPath := range files {
wg.Add(1)
go func(file, fullPath string) {
defer wg.Done()
// really should take way less than a second
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
log := log.With("plugin_id", file)
p, err := hvm.NewHostVolumePluginExternal(log, file, fullPath, "")
if err != nil {
log.Warn("error getting plugin", "error", err)
return
}
version, err := p.Version(ctx)
if err != nil {
log.Debug("failed to get version from plugin", "error", err)
return
}
mut.Lock()
plugins[file] = version.String()
mut.Unlock()
}(file, fullPath)
}
wg.Wait()
return plugins, nil
}

View File

@@ -0,0 +1,89 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package fingerprint
import (
"os"
"path/filepath"
"runtime"
"testing"
"github.com/hashicorp/nomad/client/config"
hvm "github.com/hashicorp/nomad/client/hostvolumemanager"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
)
// this is more of a full integration test of:
// fingerprint <- find plugins <- find executables
func TestPluginsHostVolumeFingerprint(t *testing.T) {
cfg := &config.Config{HostVolumePluginDir: ""}
node := &structs.Node{Attributes: map[string]string{}}
req := &FingerprintRequest{Config: cfg, Node: node}
fp := NewPluginsHostVolumeFingerprint(testlog.HCLogger(t))
// this fingerprint is not mandatory, so no error should be returned
for name, path := range map[string]string{
"empty": "",
"non-existent": "/nowhere",
"impossible": "dynamic_host_volumes_test.go",
} {
t.Run(name, func(t *testing.T) {
resp := FingerprintResponse{}
cfg.HostVolumePluginDir = path
err := fp.Fingerprint(req, &resp)
must.NoError(t, err)
must.True(t, resp.Detected) // always true due to "mkdir" built-in
})
}
if runtime.GOOS == "windows" {
t.Skip("test scripts not built for windows") // db TODO(1.10.0)
}
// happy path: dir exists. this one will contain a single valid plugin.
tmp := t.TempDir()
cfg.HostVolumePluginDir = tmp
files := []struct {
name string
contents string
perm os.FileMode
}{
// only this first one should be detected as a valid plugin
{"happy-plugin", "#!/usr/bin/env sh\necho '0.0.1'", 0700},
{"not-a-plugin", "#!/usr/bin/env sh\necho 'not-a-version'", 0700},
{"unhappy-plugin", "#!/usr/bin/env sh\necho '0.0.2'; exit 1", 0700},
{"not-executable", "hello", 0400},
}
for _, f := range files {
must.NoError(t, os.WriteFile(filepath.Join(tmp, f.name), []byte(f.contents), f.perm))
}
// directories should be ignored
must.NoError(t, os.Mkdir(filepath.Join(tmp, "a-directory"), 0700))
// do the fingerprint
resp := FingerprintResponse{}
err := fp.Fingerprint(req, &resp)
must.NoError(t, err)
must.Eq(t, map[string]string{
"plugins.host_volume.version.happy-plugin": "0.0.1",
"plugins.host_volume.version.mkdir": hvm.HostVolumePluginMkdirVersion, // built-in
}, resp.Attributes)
// do it again after deleting our one good plugin.
// repeat runs should wipe attributes, so nothing should remain.
node.Attributes = resp.Attributes
must.NoError(t, os.Remove(filepath.Join(tmp, "happy-plugin")))
resp = FingerprintResponse{}
err = fp.Fingerprint(req, &resp)
must.NoError(t, err)
must.Eq(t, map[string]string{
"plugins.host_volume.version.happy-plugin": "", // empty value means removed
"plugins.host_volume.version.mkdir": hvm.HostVolumePluginMkdirVersion, // built-in
}, resp.Attributes)
}

View File

@@ -32,19 +32,20 @@ var (
// hostFingerprinters contains the host fingerprints which are available for a
// given platform.
hostFingerprinters = map[string]Factory{
"arch": NewArchFingerprint,
"consul": NewConsulFingerprint,
"cni": NewCNIFingerprint, // networks
"cpu": NewCPUFingerprint,
"host": NewHostFingerprint,
"landlock": NewLandlockFingerprint,
"memory": NewMemoryFingerprint,
"network": NewNetworkFingerprint,
"nomad": NewNomadFingerprint,
"plugins_cni": NewPluginsCNIFingerprint,
"signal": NewSignalFingerprint,
"storage": NewStorageFingerprint,
"vault": NewVaultFingerprint,
"arch": NewArchFingerprint,
"consul": NewConsulFingerprint,
"cni": NewCNIFingerprint, // networks
"cpu": NewCPUFingerprint,
"host": NewHostFingerprint,
"landlock": NewLandlockFingerprint,
"memory": NewMemoryFingerprint,
"network": NewNetworkFingerprint,
"nomad": NewNomadFingerprint,
"plugins_cni": NewPluginsCNIFingerprint,
"host_volume_plugins": NewPluginsHostVolumeFingerprint,
"signal": NewSignalFingerprint,
"storage": NewStorageFingerprint,
"vault": NewVaultFingerprint,
}
// envFingerprinters contains the fingerprints that are environment specific.

View File

@@ -8,7 +8,7 @@ import (
"testing"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/hostvolumemanager"
hvm "github.com/hashicorp/nomad/client/hostvolumemanager"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/shoenig/test/must"
@@ -22,8 +22,8 @@ func TestHostVolume(t *testing.T) {
tmp := t.TempDir()
expectDir := filepath.Join(tmp, "test-vol-id")
hvm := hostvolumemanager.NewHostVolumeManager(tmp, testlog.HCLogger(t))
client.hostVolumeManager = hvm
client.hostVolumeManager = hvm.NewHostVolumeManager(testlog.HCLogger(t),
"/no/ext/plugins", tmp)
t.Run("happy", func(t *testing.T) {
req := &cstructs.ClientHostVolumeCreateRequest{
@@ -60,19 +60,20 @@ func TestHostVolume(t *testing.T) {
}
var resp cstructs.ClientHostVolumeCreateResponse
err := client.ClientRPC("HostVolume.Create", req, &resp)
must.EqError(t, err, `no such plugin "non-existent"`)
must.EqError(t, err, `no such plugin: "non-existent"`)
delReq := &cstructs.ClientHostVolumeDeleteRequest{
PluginID: "non-existent",
}
var delResp cstructs.ClientHostVolumeDeleteResponse
err = client.ClientRPC("HostVolume.Delete", delReq, &delResp)
must.EqError(t, err, `no such plugin "non-existent"`)
must.EqError(t, err, `no such plugin: "non-existent"`)
})
t.Run("error from plugin", func(t *testing.T) {
// "mkdir" plugin can't create a directory within a file
client.hostVolumeManager = hostvolumemanager.NewHostVolumeManager("host_volume_endpoint_test.go", testlog.HCLogger(t))
client.hostVolumeManager = hvm.NewHostVolumeManager(testlog.HCLogger(t),
"/no/ext/plugins", "host_volume_endpoint_test.go")
req := &cstructs.ClientHostVolumeCreateRequest{
ID: "test-vol-id",

View File

@@ -34,6 +34,9 @@ type HostVolumePluginCreateResponse struct {
Context map[string]string `json:"context"` // metadata
}
const HostVolumePluginMkdirID = "mkdir"
const HostVolumePluginMkdirVersion = "0.0.1"
var _ HostVolumePlugin = &HostVolumePluginMkdir{}
type HostVolumePluginMkdir struct {
@@ -44,7 +47,7 @@ type HostVolumePluginMkdir struct {
}
func (p *HostVolumePluginMkdir) Version(_ context.Context) (*version.Version, error) {
return version.NewVersion("0.0.1")
return version.NewVersion(HostVolumePluginMkdirVersion)
}
func (p *HostVolumePluginMkdir) Create(_ context.Context,
@@ -91,6 +94,29 @@ func (p *HostVolumePluginMkdir) Delete(_ context.Context, req *cstructs.ClientHo
var _ HostVolumePlugin = &HostVolumePluginExternal{}
func NewHostVolumePluginExternal(log hclog.Logger,
id, executable, targetPath string) (*HostVolumePluginExternal, error) {
// this should only be called with already-detected executables,
// but we'll double-check it anyway, so we can provide a tidy error message
// if it has changed between fingerprinting and execution.
f, err := os.Stat(executable)
if err != nil {
if os.IsNotExist(err) {
return nil, fmt.Errorf("%w: %q", ErrPluginNotExists, id)
}
return nil, err
}
if !helper.IsExecutable(f) {
return nil, fmt.Errorf("%w: %q", ErrPluginNotExecutable, id)
}
return &HostVolumePluginExternal{
ID: id,
Executable: executable,
TargetPath: targetPath,
log: log,
}, nil
}
type HostVolumePluginExternal struct {
ID string
Executable string

View File

@@ -5,60 +5,57 @@ package hostvolumemanager
import (
"context"
"fmt"
"sync"
"errors"
"path/filepath"
"github.com/hashicorp/go-hclog"
cstructs "github.com/hashicorp/nomad/client/structs"
)
var (
ErrPluginNotExists = errors.New("no such plugin")
ErrPluginNotExecutable = errors.New("plugin not executable")
)
type HostVolumeManager struct {
log hclog.Logger
plugins *sync.Map
pluginDir string
sharedMountDir string
log hclog.Logger
}
func NewHostVolumeManager(sharedMountDir string, logger hclog.Logger) *HostVolumeManager {
log := logger.Named("host_volumes")
func NewHostVolumeManager(logger hclog.Logger, pluginDir, sharedMountDir string) *HostVolumeManager {
log := logger.Named("host_volume_mgr")
mgr := &HostVolumeManager{
log: log,
plugins: &sync.Map{},
// db TODO(1.10.0): how do we define the external mounter plugins? plugin configs?
return &HostVolumeManager{
log: log,
pluginDir: pluginDir,
sharedMountDir: sharedMountDir,
}
// db TODO(1.10.0): discover plugins on disk, need a new plugin dir
// TODO: how do we define the external mounter plugins? plugin configs?
mgr.setPlugin("mkdir", &HostVolumePluginMkdir{
ID: "mkdir",
TargetPath: sharedMountDir,
log: log.With("plugin_id", "mkdir"),
})
mgr.setPlugin("example-host-volume", &HostVolumePluginExternal{
ID: "example-host-volume",
Executable: "/opt/nomad/hostvolumeplugins/example-host-volume",
TargetPath: sharedMountDir,
log: log.With("plugin_id", "example-host-volume"),
})
return mgr
}
// db TODO(1.10.0): fingerprint elsewhere / on sighup, and SetPlugin from afar?
func (hvm *HostVolumeManager) setPlugin(id string, plug HostVolumePlugin) {
hvm.plugins.Store(id, plug)
}
func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, error) {
log := hvm.log.With("plugin_id", id)
func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, bool) {
obj, ok := hvm.plugins.Load(id)
if !ok {
return nil, false
if id == HostVolumePluginMkdirID {
return &HostVolumePluginMkdir{
ID: HostVolumePluginMkdirID,
TargetPath: hvm.sharedMountDir,
log: log,
}, nil
}
return obj.(HostVolumePlugin), true
path := filepath.Join(hvm.pluginDir, id)
return NewHostVolumePluginExternal(log, id, path, hvm.sharedMountDir)
}
func (hvm *HostVolumeManager) Create(ctx context.Context,
req *cstructs.ClientHostVolumeCreateRequest) (*cstructs.ClientHostVolumeCreateResponse, error) {
plug, ok := hvm.getPlugin(req.PluginID)
if !ok {
return nil, fmt.Errorf("no such plugin %q", req.PluginID)
plug, err := hvm.getPlugin(req.PluginID)
if err != nil {
return nil, err
}
pluginResp, err := plug.Create(ctx, req)
@@ -80,12 +77,12 @@ func (hvm *HostVolumeManager) Create(ctx context.Context,
func (hvm *HostVolumeManager) Delete(ctx context.Context,
req *cstructs.ClientHostVolumeDeleteRequest) (*cstructs.ClientHostVolumeDeleteResponse, error) {
plug, ok := hvm.getPlugin(req.PluginID)
if !ok {
return nil, fmt.Errorf("no such plugin %q", req.PluginID)
plug, err := hvm.getPlugin(req.PluginID)
if err != nil {
return nil, err
}
err := plug.Delete(ctx, req)
err = plug.Delete(ctx, req)
if err != nil {
return nil, err
}

View File

@@ -724,6 +724,7 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) {
if agentConfig.DataDir != "" {
conf.StateDir = filepath.Join(agentConfig.DataDir, "client")
conf.AllocDir = filepath.Join(agentConfig.DataDir, "alloc")
conf.HostVolumePluginDir = filepath.Join(agentConfig.DataDir, "host_volume_plugins")
dataParent := filepath.Dir(agentConfig.DataDir)
conf.AllocMountsDir = filepath.Join(dataParent, "alloc_mounts")
}
@@ -736,6 +737,9 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) {
if agentConfig.Client.AllocMountsDir != "" {
conf.AllocMountsDir = agentConfig.Client.AllocMountsDir
}
if agentConfig.Client.HostVolumePluginDir != "" {
conf.HostVolumePluginDir = agentConfig.Client.HostVolumePluginDir
}
if agentConfig.Client.NetworkInterface != "" {
conf.NetworkInterface = agentConfig.Client.NetworkInterface
}

View File

@@ -111,6 +111,7 @@ func (c *Command) readConfig() *Config {
flags.StringVar(&cmdConfig.Client.StateDir, "state-dir", "", "")
flags.StringVar(&cmdConfig.Client.AllocDir, "alloc-dir", "", "")
flags.StringVar(&cmdConfig.Client.AllocMountsDir, "alloc-mounts-dir", "", "")
flags.StringVar(&cmdConfig.Client.HostVolumePluginDir, "host-volume-plugin-dir", "", "")
flags.StringVar(&cmdConfig.Client.NodeClass, "node-class", "", "")
flags.StringVar(&cmdConfig.Client.NodePool, "node-pool", "", "")
flags.StringVar(&servers, "servers", "", "")
@@ -384,11 +385,12 @@ func (c *Command) IsValidConfig(config, cmdConfig *Config) bool {
// Verify the paths are absolute.
dirs := map[string]string{
"data-dir": config.DataDir,
"plugin-dir": config.PluginDir,
"alloc-dir": config.Client.AllocDir,
"alloc-mounts-dir": config.Client.AllocMountsDir,
"state-dir": config.Client.StateDir,
"data-dir": config.DataDir,
"plugin-dir": config.PluginDir,
"alloc-dir": config.Client.AllocDir,
"alloc-mounts-dir": config.Client.AllocMountsDir,
"host-volume-plugin-dir": config.Client.HostVolumePluginDir,
"state-dir": config.Client.StateDir,
}
for k, dir := range dirs {
if dir == "" {
@@ -735,6 +737,7 @@ func (c *Command) AutocompleteFlags() complete.Flags {
"-region": complete.PredictAnything,
"-data-dir": complete.PredictDirs("*"),
"-plugin-dir": complete.PredictDirs("*"),
"-host-volume-plugin-dir": complete.PredictDirs("*"),
"-dc": complete.PredictAnything,
"-log-level": complete.PredictAnything,
"-json-logs": complete.PredictNothing,
@@ -1568,6 +1571,10 @@ Client Options:
The default speed for network interfaces in MBits if the link speed can not
be determined dynamically.
-host-volume-plugin-dir
Directory containing dynamic host volume plugins. The default is
<data-dir>/host_volume_plugins.
ACL Options:
-acl-enabled

View File

@@ -229,6 +229,10 @@ type ClientConfig struct {
// AllocMountsDir is the directory for storing mounts into allocation data
AllocMountsDir string `hcl:"alloc_mounts_dir"`
// HostVolumePluginDir directory contains dynamic host volume plugins
// db TODO(1.10.0): document default directory is alongside alloc_mounts
HostVolumePluginDir string `hcl:"host_volume_plugin_dir"`
// Servers is a list of known server addresses. These are as "host:port"
Servers []string `hcl:"servers"`
@@ -2316,6 +2320,9 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig {
if b.AllocMountsDir != "" {
result.AllocMountsDir = b.AllocMountsDir
}
if b.HostVolumePluginDir != "" {
result.HostVolumePluginDir = b.HostVolumePluginDir
}
if b.NodeClass != "" {
result.NodeClass = b.NodeClass
}

View File

@@ -53,8 +53,8 @@ if [[ "$OSTYPE" == "linux-"* ]]; then
ext=ext4
mount=/usr/bin/mount
mkfsExec() {
dd if=/dev/zero of="$1".$ext bs=1M count="$2"
mkfs.ext4 "$1".$ext 1>&2
dd if=/dev/zero of="$1".$ext bs=1M count="$2"
mkfs.ext4 "$1".$ext 1>&2
}
mountExec() {
$mount "$1".$ext "$1"
@@ -76,7 +76,7 @@ elif [[ "$OSTYPE" == "darwin"* ]]; then
}
else
echo "$OSTYPE is an unsupported OS"
return 1
exit 1
fi
validate_path() {
@@ -88,7 +88,7 @@ validate_path() {
}
is_mounted() {
$mount | grep -q "^$1$"
$mount | grep -q " $1 "
}
create_volume() {

View File

@@ -9,6 +9,7 @@ import (
"maps"
"math"
"net/http"
"os"
"path/filepath"
"reflect"
"regexp"
@@ -542,3 +543,33 @@ func FlattenMultierror(err error) error {
}
return mErr.ErrorOrNil()
}
// FindExecutableFiles looks in the provided path for executables and returns
// a map where keys are filenames and values are the absolute path.
func FindExecutableFiles(path string) (map[string]string, error) {
executables := make(map[string]string)
entries, err := os.ReadDir(path)
if err != nil {
return executables, err
}
for _, e := range entries {
i, err := e.Info()
if err != nil {
return executables, err
}
if !IsExecutable(i) {
continue
}
p := filepath.Join(path, i.Name())
abs, err := filepath.Abs(p)
if err != nil {
return executables, err
}
executables[i.Name()] = abs
}
return executables, nil
}
func IsExecutable(i os.FileInfo) bool {
return !i.IsDir() && i.Mode()&0o111 != 0
}