Merge pull request #4933 from hashicorp/f-mount-device

Mount Devices in container based drivers
This commit is contained in:
Mahmood Ali
2018-12-07 10:32:03 -05:00
committed by GitHub
12 changed files with 576 additions and 33 deletions

View File

@@ -356,6 +356,28 @@ type DockerDevice struct {
CgroupPermissions string `codec:"cgroup_permissions"`
}
func (d DockerDevice) toDockerDevice() (docker.Device, error) {
dd := docker.Device{
PathOnHost: d.HostPath,
PathInContainer: d.ContainerPath,
CgroupPermissions: d.CgroupPermissions,
}
if d.HostPath == "" {
return dd, fmt.Errorf("host path must be set in configuration for devices")
}
if dd.CgroupPermissions == "" {
dd.CgroupPermissions = "rwm"
}
if !validateCgroupPermission(dd.CgroupPermissions) {
return dd, fmt.Errorf("invalid cgroup permission string: %q", dd.CgroupPermissions)
}
return dd, nil
}
type DockerLogging struct {
Type string `codec:"type"`
Config map[string]string `codec:"config"`

View File

@@ -718,29 +718,20 @@ func (d *Driver) createContainerConfig(task *drivers.TaskConfig, driverConfig *T
}
}
if len(driverConfig.Devices) > 0 {
var devices []docker.Device
for _, device := range driverConfig.Devices {
if device.HostPath == "" {
return c, fmt.Errorf("host path must be set in configuration for devices")
}
if device.CgroupPermissions != "" {
for _, char := range device.CgroupPermissions {
ch := string(char)
if ch != "r" && ch != "w" && ch != "m" {
return c, fmt.Errorf("invalid cgroup permission string: %q", device.CgroupPermissions)
}
}
} else {
device.CgroupPermissions = "rwm"
}
dev := docker.Device{
PathOnHost: device.HostPath,
PathInContainer: device.ContainerPath,
CgroupPermissions: device.CgroupPermissions}
devices = append(devices, dev)
// Setup devices
for _, device := range driverConfig.Devices {
dd, err := device.toDockerDevice()
if err != nil {
return c, err
}
hostConfig.Devices = devices
hostConfig.Devices = append(hostConfig.Devices, dd)
}
for _, device := range task.Devices {
hostConfig.Devices = append(hostConfig.Devices, docker.Device{
PathOnHost: device.HostPath,
PathInContainer: device.TaskPath,
CgroupPermissions: device.Permissions,
})
}
// Setup mounts
@@ -763,6 +754,14 @@ func (d *Driver) createContainerConfig(task *drivers.TaskConfig, driverConfig *T
hostConfig.Mounts = append(hostConfig.Mounts, hm)
}
for _, m := range task.Mounts {
hostConfig.Mounts = append(hostConfig.Mounts, docker.HostMount{
Type: "bind",
Target: m.TaskPath,
Source: m.HostPath,
ReadOnly: m.Readonly,
})
}
// set DNS search domains and extra hosts
hostConfig.DNSSearch = driverConfig.DNSSearchDomains

View File

@@ -11,6 +11,7 @@ import (
"reflect"
"runtime"
"runtime/debug"
"sort"
"strconv"
"strings"
"testing"
@@ -1908,6 +1909,95 @@ func TestDockerDriver_MountsSerialization(t *testing.T) {
}
// TestDockerDriver_CreateContainerConfig_MountsCombined asserts that
// devices and mounts set by device managers/plugins are honored
// and present in docker.CreateContainerOptions, and that it is appended
// to any devices/mounts a user sets in the task config.
func TestDockerDriver_CreateContainerConfig_MountsCombined(t *testing.T) {
t.Parallel()
task, cfg, _ := dockerTask(t)
task.Devices = []*drivers.DeviceConfig{
{
HostPath: "/dev/fuse",
TaskPath: "/container/dev/task-fuse",
Permissions: "rw",
},
}
task.Mounts = []*drivers.MountConfig{
{
HostPath: "/tmp/task-mount",
TaskPath: "/container/tmp/task-mount",
Readonly: true,
},
}
cfg.Devices = []DockerDevice{
{
HostPath: "/dev/stdout",
ContainerPath: "/container/dev/cfg-stdout",
CgroupPermissions: "rwm",
},
}
cfg.Mounts = []DockerMount{
{
Type: "bind",
Source: "/tmp/cfg-mount",
Target: "/container/tmp/cfg-mount",
ReadOnly: false,
},
}
require.NoError(t, task.EncodeConcreteDriverConfig(cfg))
dh := dockerDriverHarness(t, nil)
driver := dh.Impl().(*Driver)
c, err := driver.createContainerConfig(task, cfg, "org/repo:0.1")
require.NoError(t, err)
expectedMounts := []docker.HostMount{
{
Type: "bind",
Source: "/tmp/cfg-mount",
Target: "/container/tmp/cfg-mount",
ReadOnly: false,
BindOptions: &docker.BindOptions{},
},
{
Type: "bind",
Source: "/tmp/task-mount",
Target: "/container/tmp/task-mount",
ReadOnly: true,
},
}
foundMounts := c.HostConfig.Mounts
sort.Slice(foundMounts, func(i, j int) bool {
return foundMounts[i].Target < foundMounts[j].Target
})
require.EqualValues(t, expectedMounts, foundMounts)
expectedDevices := []docker.Device{
{
PathOnHost: "/dev/stdout",
PathInContainer: "/container/dev/cfg-stdout",
CgroupPermissions: "rwm",
},
{
PathOnHost: "/dev/fuse",
PathInContainer: "/container/dev/task-fuse",
CgroupPermissions: "rw",
},
}
foundDevices := c.HostConfig.Devices
sort.Slice(foundDevices, func(i, j int) bool {
return foundDevices[i].PathInContainer < foundDevices[j].PathInContainer
})
require.EqualValues(t, expectedDevices, foundDevices)
}
// TestDockerDriver_Cleanup ensures Cleanup removes only downloaded images.
func TestDockerDriver_Cleanup(t *testing.T) {
if !testutil.DockerIsConnected(t) {

View File

@@ -188,3 +188,15 @@ func authIsEmpty(auth *docker.AuthConfiguration) bool {
auth.Email == "" &&
auth.ServerAddress == ""
}
func validateCgroupPermission(s string) bool {
for _, c := range s {
switch c {
case 'r', 'w', 'm':
default:
return false
}
}
return true
}

View File

@@ -0,0 +1,37 @@
package docker
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestValidateCgroupPermission(t *testing.T) {
positiveCases := []string{
"r",
"rw",
"rwm",
"mr",
"mrw",
"",
}
for _, c := range positiveCases {
t.Run("positive case: "+c, func(t *testing.T) {
require.True(t, validateCgroupPermission(c))
})
}
negativeCases := []string{
"q",
"asdf",
"rq",
}
for _, c := range negativeCases {
t.Run("negative case: "+c, func(t *testing.T) {
require.False(t, validateCgroupPermission(c))
})
}
}

View File

@@ -12,7 +12,8 @@ import (
"time"
"github.com/hashicorp/nomad/plugins/drivers"
lxc "gopkg.in/lxc/go-lxc.v2"
ldevices "github.com/opencontainers/runc/libcontainer/devices"
"gopkg.in/lxc/go-lxc.v2"
)
var (
@@ -91,6 +92,33 @@ func networkTypeConfigKey() string {
}
func (d *Driver) mountVolumes(c *lxc.Container, cfg *drivers.TaskConfig, taskConfig TaskConfig) error {
mounts, err := d.mountEntries(cfg, taskConfig)
if err != nil {
return err
}
devCgroupAllows, err := d.devicesCgroupEntries(cfg)
if err != nil {
return err
}
for _, mnt := range mounts {
if err := c.SetConfigItem("lxc.mount.entry", mnt); err != nil {
return fmt.Errorf("error setting bind mount %q error: %v", mnt, err)
}
}
for _, cgroupDev := range devCgroupAllows {
if err := c.SetConfigItem("lxc.cgroup.devices.allow", cgroupDev); err != nil {
return fmt.Errorf("error setting cgroup permission %q error: %v", cgroupDev, err)
}
}
return nil
}
// mountEntries compute the mount entries to be set on the container
func (d *Driver) mountEntries(cfg *drivers.TaskConfig, taskConfig TaskConfig) ([]string, error) {
// Bind mount the shared alloc dir and task local dir in the container
mounts := []string{
fmt.Sprintf("%s local none rw,bind,create=dir", cfg.TaskDir().LocalDir),
@@ -98,6 +126,9 @@ func (d *Driver) mountVolumes(c *lxc.Container, cfg *drivers.TaskConfig, taskCon
fmt.Sprintf("%s secrets none rw,bind,create=dir", cfg.TaskDir().SecretsDir),
}
mounts = append(mounts, d.formatTaskMounts(cfg.Mounts)...)
mounts = append(mounts, d.formatTaskDevices(cfg.Devices)...)
volumesEnabled := d.config.AllowVolumes
for _, volDesc := range taskConfig.Volumes {
@@ -106,23 +137,75 @@ func (d *Driver) mountVolumes(c *lxc.Container, cfg *drivers.TaskConfig, taskCon
if filepath.IsAbs(paths[0]) {
if !volumesEnabled {
return fmt.Errorf("absolute bind-mount volume in config but volumes are disabled")
return nil, fmt.Errorf("absolute bind-mount volume in config but volumes are disabled")
}
} else {
// Relative source paths are treated as relative to alloc dir
paths[0] = filepath.Join(cfg.TaskDir().Dir, paths[0])
}
mounts = append(mounts, fmt.Sprintf("%s %s none rw,bind,create=dir", paths[0], paths[1]))
// LXC assumes paths are relative with respect to rootfs
target := strings.TrimLeft(paths[1], "/")
mounts = append(mounts, fmt.Sprintf("%s %s none rw,bind,create=dir", paths[0], target))
}
for _, mnt := range mounts {
if err := c.SetConfigItem("lxc.mount.entry", mnt); err != nil {
return fmt.Errorf("error setting bind mount %q error: %v", mnt, err)
return mounts, nil
}
func (d *Driver) devicesCgroupEntries(cfg *drivers.TaskConfig) ([]string, error) {
entries := make([]string, len(cfg.Devices))
for i, d := range cfg.Devices {
hd, err := ldevices.DeviceFromPath(d.HostPath, d.Permissions)
if err != nil {
return nil, err
}
entries[i] = hd.CgroupString()
}
return nil
return entries, nil
}
func (d *Driver) formatTaskMounts(mounts []*drivers.MountConfig) []string {
result := make([]string, len(mounts))
for i, m := range mounts {
result[i] = d.formatMount(m.HostPath, m.TaskPath, m.Readonly)
}
return result
}
func (d *Driver) formatTaskDevices(devices []*drivers.DeviceConfig) []string {
result := make([]string, len(devices))
for i, m := range devices {
result[i] = d.formatMount(m.HostPath, m.TaskPath,
!strings.Contains(m.Permissions, "w"))
}
return result
}
func (d *Driver) formatMount(hostPath, taskPath string, readOnly bool) string {
typ := "dir"
s, err := os.Stat(hostPath)
if err != nil {
d.logger.Warn("failed to find mount host path type, defaulting to dir type", "path", hostPath, "error", err)
} else if !s.IsDir() {
typ = "file"
}
perm := "rw"
if readOnly {
perm = "ro"
}
// LXC assumes paths are relative with respect to rootfs
target := strings.TrimLeft(taskPath, "/")
return fmt.Sprintf("%s %s none %s,bind,create=%s", hostPath, target, perm, typ)
}
func (d *Driver) setResourceLimits(c *lxc.Container, cfg *drivers.TaskConfig) error {

91
drivers/lxc/lxc_test.go Normal file
View File

@@ -0,0 +1,91 @@
//+build linux,lxc
package lxc
import (
"testing"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/stretchr/testify/require"
)
func TestLXCDriver_Mounts(t *testing.T) {
t.Parallel()
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "test",
Resources: &drivers.Resources{
NomadResources: &structs.Resources{
CPU: 1,
MemoryMB: 2,
},
LinuxResources: &drivers.LinuxResources{
CPUShares: 1024,
MemoryLimitBytes: 2 * 1024,
},
},
Mounts: []*drivers.MountConfig{
{HostPath: "/dev", TaskPath: "/task-mounts/dev-path"},
{HostPath: "/bin/sh", TaskPath: "/task-mounts/task-path-ro", Readonly: true},
},
Devices: []*drivers.DeviceConfig{
{HostPath: "/dev", TaskPath: "/task-devices/dev-path", Permissions: "rw"},
{HostPath: "/bin/sh", TaskPath: "/task-devices/task-path-ro", Permissions: "ro"},
},
}
taskConfig := TaskConfig{
Template: "busybox",
Volumes: []string{
"relative/path:/usr-config/container/path",
"relative/path2:usr-config/container/relative",
},
}
d := NewLXCDriver(testlog.HCLogger(t)).(*Driver)
d.config.Enabled = true
entries, err := d.mountEntries(task, taskConfig)
require.NoError(t, err)
expectedEntries := []string{
"test/relative/path usr-config/container/path none rw,bind,create=dir",
"test/relative/path2 usr-config/container/relative none rw,bind,create=dir",
"/dev task-mounts/dev-path none rw,bind,create=dir",
"/bin/sh task-mounts/task-path-ro none ro,bind,create=file",
"/dev task-devices/dev-path none rw,bind,create=dir",
"/bin/sh task-devices/task-path-ro none ro,bind,create=file",
}
for _, e := range expectedEntries {
require.Contains(t, entries, e)
}
}
func TestLXCDriver_DevicesCgroup(t *testing.T) {
t.Parallel()
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "test",
Devices: []*drivers.DeviceConfig{
{HostPath: "/dev/random", TaskPath: "/task-devices/devrandom", Permissions: "rw"},
{HostPath: "/dev/null", TaskPath: "/task-devices/devnull", Permissions: "rwm"},
},
}
d := NewLXCDriver(testlog.HCLogger(t)).(*Driver)
d.config.Enabled = true
cgroupEntries, err := d.devicesCgroupEntries(task)
require.NoError(t, err)
expected := []string{
"c 1:8 rw",
"c 1:3 rwm",
}
require.EqualValues(t, expected, cgroupEntries)
}

View File

@@ -488,6 +488,21 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru
}
}
// Mount task volumes, always do
for i, vol := range cfg.Mounts {
volName := fmt.Sprintf("%s-%s-taskmounts-%d", cfg.AllocID, sanitizedName, i)
prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s,readOnly=%v", volName, vol.HostPath, vol.Readonly))
prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", volName, vol.TaskPath))
}
// Mount task devices, always do
for i, vol := range cfg.Devices {
volName := fmt.Sprintf("%s-%s-taskdevices-%d", cfg.AllocID, sanitizedName, i)
readOnly := !strings.Contains(vol.Permissions, "w")
prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s,readOnly=%v", volName, vol.HostPath, readOnly))
prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", volName, vol.TaskPath))
}
// Inject environment variables
for k, v := range cfg.Env {
prepareArgs = append(prepareArgs, fmt.Sprintf("--set-env=%s=%s", k, v))

View File

@@ -502,6 +502,89 @@ func TestRktDriver_Start_Wait_Volume(t *testing.T) {
require.NoError(harness.DestroyTask(task.ID, true))
}
// Verifies mounting a task mount from the host machine and writing
// some data to it from inside the container
func TestRktDriver_Start_Wait_TaskMounts(t *testing.T) {
ctestutil.RktCompatible(t)
if !testutil.IsTravis() {
t.Parallel()
}
require := require.New(t)
d := NewRktDriver(testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
// mounts through task config should be enabled regardless
config := &Config{VolumesEnabled: false}
var data []byte
require.NoError(basePlug.MsgPackEncode(&data, config))
require.NoError(harness.SetConfig(data, nil))
tmpvol, err := ioutil.TempDir("", "nomadtest_rktdriver_volumes")
require.NoError(err)
defer os.RemoveAll(tmpvol)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
AllocID: uuid.Generate(),
Name: "rkttest_alpine",
Resources: &drivers.Resources{
NomadResources: &structs.Resources{
MemoryMB: 128,
CPU: 100,
},
LinuxResources: &drivers.LinuxResources{
MemoryLimitBytes: 134217728,
CPUShares: 100,
},
},
Mounts: []*drivers.MountConfig{
{HostPath: tmpvol, TaskPath: "/foo", Readonly: false},
},
}
exp := []byte{'w', 'i', 'n'}
file := "output.txt"
hostpath := filepath.Join(tmpvol, file)
taskConfig := map[string]interface{}{
"image": "docker://redis:3.2-alpine",
"command": "/bin/sh",
"args": []string{
"-c",
fmt.Sprintf("echo -n %s > /foo/%s", string(exp), file),
},
"net": []string{"none"},
}
encodeDriverHelper(require, task, taskConfig)
testtask.SetTaskConfigEnv(task)
cleanup := harness.MkAllocDir(task, true)
defer cleanup()
_, _, err = harness.StartTask(task)
require.NoError(err)
// Task should terminate quickly
waitCh, err := harness.WaitTask(context.Background(), task.ID)
require.NoError(err)
select {
case res := <-waitCh:
require.NoError(res.Err)
require.True(res.Successful(), fmt.Sprintf("exit code %v", res.ExitCode))
case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second):
require.Fail("WaitTask timeout")
}
// Check that data was written to the shared alloc directory.
act, err := ioutil.ReadFile(hostpath)
require.NoError(err)
require.Exactly(exp, act)
require.NoError(harness.DestroyTask(task.ID, true))
}
// Verifies port mapping
func TestRktDriver_PortMapping(t *testing.T) {
ctestutil.RktCompatible(t)

View File

@@ -256,6 +256,11 @@ func (d *FsDevice) Reserve(deviceIDs []string) (*device.ContainerReservation, er
return nil, status.New(codes.InvalidArgument, "no device ids given").Err()
}
deviceDir, err := filepath.Abs(d.deviceDir)
if err != nil {
return nil, status.Newf(codes.Internal, "failed to load device dir abs path").Err()
}
resp := &device.ContainerReservation{}
for _, id := range deviceIDs {
@@ -265,10 +270,10 @@ func (d *FsDevice) Reserve(deviceIDs []string) (*device.ContainerReservation, er
}
// Add a mount
resp.Devices = append(resp.Devices, &device.DeviceSpec{
TaskPath: fmt.Sprintf("/dev/%s", id),
HostPath: filepath.Join(d.deviceDir, id),
CgroupPerms: "rw",
resp.Mounts = append(resp.Mounts, &device.Mount{
TaskPath: fmt.Sprintf("/tmp/task-mounts/%s", id),
HostPath: filepath.Join(deviceDir, id),
ReadOnly: false,
})
}

View File

@@ -0,0 +1,105 @@
package devices
import (
"errors"
"io/ioutil"
"os"
"path/filepath"
"github.com/opencontainers/runc/libcontainer/configs"
"golang.org/x/sys/unix"
)
var (
ErrNotADevice = errors.New("not a device node")
)
// Testing dependencies
var (
unixLstat = unix.Lstat
ioutilReadDir = ioutil.ReadDir
)
// Given the path to a device and its cgroup_permissions(which cannot be easily queried) look up the information about a linux device and return that information as a Device struct.
func DeviceFromPath(path, permissions string) (*configs.Device, error) {
var stat unix.Stat_t
err := unixLstat(path, &stat)
if err != nil {
return nil, err
}
var (
devNumber = uint64(stat.Rdev)
major = unix.Major(devNumber)
minor = unix.Minor(devNumber)
)
if major == 0 {
return nil, ErrNotADevice
}
var (
devType rune
mode = stat.Mode
)
switch {
case mode&unix.S_IFBLK == unix.S_IFBLK:
devType = 'b'
case mode&unix.S_IFCHR == unix.S_IFCHR:
devType = 'c'
}
return &configs.Device{
Type: devType,
Path: path,
Major: int64(major),
Minor: int64(minor),
Permissions: permissions,
FileMode: os.FileMode(mode),
Uid: stat.Uid,
Gid: stat.Gid,
}, nil
}
func HostDevices() ([]*configs.Device, error) {
return getDevices("/dev")
}
func getDevices(path string) ([]*configs.Device, error) {
files, err := ioutilReadDir(path)
if err != nil {
return nil, err
}
out := []*configs.Device{}
for _, f := range files {
switch {
case f.IsDir():
switch f.Name() {
// ".lxc" & ".lxd-mounts" added to address https://github.com/lxc/lxd/issues/2825
case "pts", "shm", "fd", "mqueue", ".lxc", ".lxd-mounts":
continue
default:
sub, err := getDevices(filepath.Join(path, f.Name()))
if err != nil {
return nil, err
}
out = append(out, sub...)
continue
}
case f.Name() == "console":
continue
}
device, err := DeviceFromPath(filepath.Join(path, f.Name()), "rwm")
if err != nil {
if err == ErrNotADevice {
continue
}
if os.IsNotExist(err) {
continue
}
return nil, err
}
out = append(out, device)
}
return out, nil
}

1
vendor/vendor.json vendored
View File

@@ -301,6 +301,7 @@
{"path":"github.com/opencontainers/runc/libcontainer/configs","checksumSHA1":"PUv5rdj6oEGJoBij/9Elx8VO6bs=","origin":"github.com/hashicorp/runc/libcontainer/configs","revision":"8df81be073884e4e4c613d893f5877310136820f","revisionTime":"2018-09-10T14:23:11Z","version":"nomad","versionExact":"nomad"},
{"path":"github.com/opencontainers/runc/libcontainer/configs/validate","checksumSHA1":"YJq+f9izqizSzG/OuVFUOfloNEk=","origin":"github.com/hashicorp/runc/libcontainer/configs/validate","revision":"8df81be073884e4e4c613d893f5877310136820f","revisionTime":"2018-09-10T14:23:11Z","version":"nomad","versionExact":"nomad"},
{"path":"github.com/opencontainers/runc/libcontainer/criurpc","checksumSHA1":"I1iwXoDUJeDXviilCtkvDpEF/So=","origin":"github.com/hashicorp/runc/libcontainer/criurpc","revision":"8df81be073884e4e4c613d893f5877310136820f","revisionTime":"2018-09-10T14:23:11Z","version":"nomad","versionExact":"nomad"},
{"path":"github.com/opencontainers/runc/libcontainer/devices","checksumSHA1":"I1iwXoDUJeDXviilCtkvDpEF/So=","origin":"github.com/hashicorp/runc/libcontainer/devices","revision":"8df81be073884e4e4c613d893f5877310136820f","revisionTime":"2018-09-10T14:23:11Z","version":"nomad","versionExact":"nomad"},
{"path":"github.com/opencontainers/runc/libcontainer/intelrdt","checksumSHA1":"bAWJX1XUDMd4GqPLSrCkUcdiTbg=","origin":"github.com/hashicorp/runc/libcontainer/intelrdt","revision":"459bfaec1fc6c17d8bfb12d0a0f69e7e7271ed2a","revisionTime":"2018-08-23T14:46:37Z","version":"nomad"},
{"path":"github.com/opencontainers/runc/libcontainer/keys","checksumSHA1":"QXuHZwxlqhoq/oHRJFbTi6+AWLY=","origin":"github.com/hashicorp/runc/libcontainer/keys","revision":"459bfaec1fc6c17d8bfb12d0a0f69e7e7271ed2a","revisionTime":"2018-08-23T14:46:37Z","version":"nomad"},
{"path":"github.com/opencontainers/runc/libcontainer/mount","checksumSHA1":"MJiogPDUU2nFr1fzQU6T+Ry1W8o=","origin":"github.com/hashicorp/runc/libcontainer/mount","revision":"459bfaec1fc6c17d8bfb12d0a0f69e7e7271ed2a","revisionTime":"2018-08-23T14:46:37Z","version":"nomad"},