Add Driver Plugin for LXC

This commit is contained in:
Mahmood Ali
2018-11-25 11:55:01 -05:00
parent 90daa8b2d3
commit b546247626
6 changed files with 1193 additions and 0 deletions

510
drivers/lxc/driver.go Normal file
View File

@@ -0,0 +1,510 @@
//+build linux,lxc
package lxc
import (
"context"
"fmt"
"strconv"
"time"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/stats"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/shared/eventer"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/plugins/shared/loader"
lxc "gopkg.in/lxc/go-lxc.v2"
)
const (
// pluginName is the name of the plugin
pluginName = "lxc"
// fingerprintPeriod is the interval at which the driver will send fingerprint responses
fingerprintPeriod = 30 * time.Second
)
var (
// PluginID is the rawexec plugin metadata registered in the plugin
// catalog.
PluginID = loader.PluginID{
Name: pluginName,
PluginType: base.PluginTypeDriver,
}
// PluginConfig is the rawexec factory function registered in the
// plugin catalog.
PluginConfig = &loader.InternalPluginConfig{
Config: map[string]interface{}{},
Factory: func(l hclog.Logger) interface{} { return NewLXCDriver(l) },
}
)
// PluginLoader maps pre-0.9 client driver options to post-0.9 plugin options.
func PluginLoader(opts map[string]string) (map[string]interface{}, error) {
conf := map[string]interface{}{}
if v, err := strconv.ParseBool(opts["driver.lxc.enable"]); err == nil {
conf["enabled"] = v
}
if v, err := strconv.ParseBool(opts["lxc.volumes.enabled"]); err == nil {
conf["volumes"] = v
}
if v, ok := opts["driver.lxc.path"]; ok {
conf["path"] = v
}
return conf, nil
}
var (
// pluginInfo is the response returned for the PluginInfo RPC
pluginInfo = &base.PluginInfoResponse{
Type: base.PluginTypeDriver,
PluginApiVersion: "0.0.1",
PluginVersion: "0.1.0",
Name: pluginName,
}
// configSpec is the hcl specification returned by the ConfigSchema RPC
configSpec = hclspec.NewObject(map[string]*hclspec.Spec{
"enabled": hclspec.NewDefault(
hclspec.NewAttr("enabled", "bool", false),
hclspec.NewLiteral("true"),
),
"volumes": hclspec.NewDefault(
hclspec.NewAttr("volumes", "bool", false),
hclspec.NewLiteral("true"),
),
"path": hclspec.NewDefault(
hclspec.NewAttr("path", "string", false),
hclspec.NewLiteral("\"\""),
),
})
// taskConfigSpec is the hcl specification for the driver config section of
// a task within a job. It is returned in the TaskConfigSchema RPC
taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{
"template": hclspec.NewAttr("template", "string", true),
"distro": hclspec.NewAttr("distro", "string", false),
"release": hclspec.NewAttr("release", "string", false),
"arch": hclspec.NewAttr("arch", "string", false),
"image_variant": hclspec.NewAttr("image_variant", "string", false),
"image_server": hclspec.NewAttr("image_server", "string", false),
"gpg_key_id": hclspec.NewAttr("gpg_key_id", "string", false),
"gpg_key_server": hclspec.NewAttr("gpg_key_server", "string", false),
"disable_gpg": hclspec.NewAttr("disable_gpg", "string", false),
"flush_cache": hclspec.NewAttr("flush_cache", "string", false),
"force_cache": hclspec.NewAttr("force_cache", "string", false),
"template_args": hclspec.NewAttr("template_args", "list(string)", false),
"log_level": hclspec.NewAttr("log_level", "string", false),
"verbosity": hclspec.NewAttr("verbosity", "string", false),
"volumes": hclspec.NewAttr("volumes", "list(string)", false),
})
// capabilities is returned by the Capabilities RPC and indicates what
// optional features this driver supports
capabilities = &drivers.Capabilities{
SendSignals: false,
Exec: false,
FSIsolation: cstructs.FSIsolationImage,
}
)
// Driver is a privileged version of the exec driver. It provides no
// resource isolation and just fork/execs. The Exec driver should be preferred
// and this should only be used when explicitly needed.
type Driver struct {
// eventer is used to handle multiplexing of TaskEvents calls such that an
// event can be broadcast to all callers
eventer *eventer.Eventer
// config is the driver configuration set by the SetConfig RPC
config *Config
// nomadConfig is the client config from nomad
nomadConfig *base.ClientDriverConfig
// tasks is the in memory datastore mapping taskIDs to rawExecDriverHandles
tasks *taskStore
// ctx is the context for the driver. It is passed to other subsystems to
// coordinate shutdown
ctx context.Context
// signalShutdown is called when the driver is shutting down and cancels the
// ctx passed to any subsystems
signalShutdown context.CancelFunc
// logger will log to the Nomad agent
logger hclog.Logger
}
// Config is the driver configuration set by the SetConfig RPC call
type Config struct {
// Enabled is set to true to enable the raw_exec driver
Enabled bool `codec:"enabled"`
AllowVolumes bool `codec:"volumes"`
Path string `codec:"path"`
}
// TaskConfig is the driver configuration of a task within a job
type TaskConfig struct {
Template string `codec:"template"`
Distro string `codec:"distro"`
Release string `codec:"release"`
Arch string `codec:"arch"`
ImageVariant string `codec:"image_variant"`
ImageServer string `codec:"image_server"`
GPGKeyID string `codec:"gpg_key_id"`
GPGKeyServer string `codec:"gpg_key_server"`
DisableGPGValidation bool `codec:"disable_gpg"`
FlushCache bool `codec:"flush_cache"`
ForceCache bool `codec:"force_cache"`
TemplateArgs []string `codec:"template_args"`
LogLevel string `codec:"log_level"`
Verbosity string `codec:"verbosity"`
Volumes []string `codec:"volumes"`
}
// TaskState is the state which is encoded in the handle returned in
// StartTask. This information is needed to rebuild the task state and handler
// during recovery.
type TaskState struct {
TaskConfig *drivers.TaskConfig
ContainerName string
StartedAt time.Time
}
// NewLXCDriver returns a new DriverPlugin implementation
func NewLXCDriver(logger hclog.Logger) drivers.DriverPlugin {
ctx, cancel := context.WithCancel(context.Background())
logger = logger.Named(pluginName)
return &Driver{
eventer: eventer.NewEventer(ctx, logger),
config: &Config{},
tasks: newTaskStore(),
ctx: ctx,
signalShutdown: cancel,
logger: logger,
}
}
func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) {
return pluginInfo, nil
}
func (d *Driver) ConfigSchema() (*hclspec.Spec, error) {
return configSpec, nil
}
func (d *Driver) SetConfig(data []byte, cfg *base.ClientAgentConfig) error {
var config Config
if err := base.MsgPackDecode(data, &config); err != nil {
return err
}
d.config = &config
if cfg != nil {
d.nomadConfig = cfg.Driver
}
return nil
}
func (d *Driver) Shutdown(ctx context.Context) error {
d.signalShutdown()
return nil
}
func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) {
return taskConfigSpec, nil
}
func (d *Driver) Capabilities() (*drivers.Capabilities, error) {
return capabilities, nil
}
func (d *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) {
ch := make(chan *drivers.Fingerprint)
go d.handleFingerprint(ctx, ch)
return ch, nil
}
func (d *Driver) handleFingerprint(ctx context.Context, ch chan<- *drivers.Fingerprint) {
defer close(ch)
ticker := time.NewTimer(0)
for {
select {
case <-ctx.Done():
return
case <-d.ctx.Done():
return
case <-ticker.C:
ticker.Reset(fingerprintPeriod)
ch <- d.buildFingerprint()
}
}
}
func (d *Driver) buildFingerprint() *drivers.Fingerprint {
var health drivers.HealthState
var desc string
attrs := map[string]string{}
lxcVersion := lxc.Version()
if d.config.Enabled && lxcVersion != "" {
health = drivers.HealthStateHealthy
desc = "ready"
attrs["driver.lxc"] = "1"
attrs["driver.lxc.version"] = lxcVersion
} else {
health = drivers.HealthStateUndetected
desc = "disabled"
}
if d.config.AllowVolumes {
attrs["driver.lxc.volumes.enabled"] = "1"
}
return &drivers.Fingerprint{
Attributes: attrs,
Health: health,
HealthDescription: desc,
}
}
func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
if handle == nil {
return fmt.Errorf("error: handle cannot be nil")
}
if _, ok := d.tasks.Get(handle.Config.ID); ok {
return nil
}
var taskState TaskState
if err := handle.GetDriverState(&taskState); err != nil {
return fmt.Errorf("failed to decode task state from handle: %v", err)
}
var driverConfig TaskConfig
if err := taskState.TaskConfig.DecodeDriverConfig(&driverConfig); err != nil {
return fmt.Errorf("failed to decode driver config: %v", err)
}
c, err := lxc.NewContainer(taskState.ContainerName, d.lxcPath())
if err != nil {
return fmt.Errorf("failed to create container ref: %v", err)
}
initPid := c.InitPid()
h := &taskHandle{
container: c,
initPid: initPid,
taskConfig: taskState.TaskConfig,
procState: drivers.TaskStateRunning,
startedAt: taskState.StartedAt,
exitResult: &drivers.ExitResult{},
totalCpuStats: stats.NewCpuStats(),
userCpuStats: stats.NewCpuStats(),
systemCpuStats: stats.NewCpuStats(),
}
d.tasks.Set(taskState.TaskConfig.ID, h)
go h.run()
return nil
}
func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstructs.DriverNetwork, error) {
if _, ok := d.tasks.Get(cfg.ID); ok {
return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID)
}
var driverConfig TaskConfig
if err := cfg.DecodeDriverConfig(&driverConfig); err != nil {
return nil, nil, fmt.Errorf("failed to decode driver config: %v", err)
}
d.logger.Info("starting lxc task", "driver_cfg", hclog.Fmt("%+v", driverConfig))
handle := drivers.NewTaskHandle(pluginName)
handle.Config = cfg
c, err := d.initializeContainer(cfg, driverConfig)
if err != nil {
return nil, nil, err
}
opt := toLXCCreateOptions(driverConfig)
if err := c.Create(opt); err != nil {
return nil, nil, fmt.Errorf("unable to create container: %v", err)
}
cleanup := func() {
if err := c.Destroy(); err != nil {
d.logger.Error("failed to clean up from an error in Start", "error", err)
}
}
if err := d.configureContainerNetwork(c); err != nil {
cleanup()
return nil, nil, err
}
if err := d.mountVolumes(c, cfg, driverConfig); err != nil {
cleanup()
return nil, nil, err
}
if err := c.Start(); err != nil {
cleanup()
return nil, nil, fmt.Errorf("unable to start container: %v", err)
}
if err := d.setResourceLimits(c, cfg); err != nil {
cleanup()
return nil, nil, err
}
pid := c.InitPid()
h := &taskHandle{
container: c,
initPid: pid,
taskConfig: cfg,
procState: drivers.TaskStateRunning,
startedAt: time.Now().Round(time.Millisecond),
logger: d.logger,
totalCpuStats: stats.NewCpuStats(),
userCpuStats: stats.NewCpuStats(),
systemCpuStats: stats.NewCpuStats(),
}
driverState := TaskState{
ContainerName: c.Name(),
TaskConfig: cfg,
StartedAt: h.startedAt,
}
if err := handle.SetDriverState(&driverState); err != nil {
d.logger.Error("failed to start task, error setting driver state", "error", err)
cleanup()
return nil, nil, fmt.Errorf("failed to set driver state: %v", err)
}
d.tasks.Set(cfg.ID, h)
go h.run()
return handle, nil, nil
}
func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
ch := make(chan *drivers.ExitResult)
go d.handleWait(ctx, handle, ch)
return ch, nil
}
func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) {
defer close(ch)
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-d.ctx.Done():
return
case <-ticker.C:
s := handle.TaskStatus()
if s.State == drivers.TaskStateExited {
ch <- handle.exitResult
}
}
}
}
func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
if err := handle.container.Shutdown(timeout); err != nil {
if err := handle.container.Stop(); err != nil {
return fmt.Errorf("executor Shutdown failed: %v", err)
}
}
return nil
}
func (d *Driver) DestroyTask(taskID string, force bool) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
if handle.IsRunning() && !force {
return fmt.Errorf("cannot destroy running task")
}
if handle.IsRunning() {
if err := handle.container.Shutdown(0); err != nil {
handle.logger.Error("failed to destroy executor", "err", err)
}
}
d.tasks.Delete(taskID)
return nil
}
func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
return handle.TaskStatus(), nil
}
func (d *Driver) TaskStats(taskID string) (*cstructs.TaskResourceUsage, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
return handle.stats()
}
func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
return d.eventer.TaskEvents(ctx)
}
func (d *Driver) SignalTask(taskID string, signal string) error {
return fmt.Errorf("LXC driver does not support signals")
}
func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) {
if len(cmd) == 0 {
return nil, fmt.Errorf("error cmd must have atleast one value")
}
return nil, fmt.Errorf("LXC driver does not support exec")
}

273
drivers/lxc/driver_test.go Normal file
View File

@@ -0,0 +1,273 @@
// +build linux,lxc
package lxc
import (
"context"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"testing"
"time"
"github.com/hashicorp/hcl2/hcl"
ctestutil "github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
lxc "gopkg.in/lxc/go-lxc.v2"
)
func TestLXCDriver_Fingerprint(t *testing.T) {
t.Parallel()
requireLXC(t)
require := require.New(t)
d := NewLXCDriver(testlog.HCLogger(t)).(*Driver)
d.config.Enabled = true
harness := drivers.NewDriverHarness(t, d)
fingerCh, err := harness.Fingerprint(context.Background())
require.NoError(err)
select {
case finger := <-fingerCh:
require.Equal(drivers.HealthStateHealthy, finger.Health)
require.Equal("1", finger.Attributes["driver.lxc"])
require.NotEmpty(finger.Attributes["driver.lxc.version"])
case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second):
require.Fail("timeout receiving fingerprint")
}
}
func TestLXCDriver_FingerprintNotEnabled(t *testing.T) {
t.Parallel()
requireLXC(t)
require := require.New(t)
d := NewLXCDriver(testlog.HCLogger(t)).(*Driver)
d.config.Enabled = false
harness := drivers.NewDriverHarness(t, d)
fingerCh, err := harness.Fingerprint(context.Background())
require.NoError(err)
select {
case finger := <-fingerCh:
require.Equal(drivers.HealthStateUndetected, finger.Health)
require.Equal("", finger.Attributes["driver.lxc"])
require.Empty(finger.Attributes["driver.lxc.version"])
case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second):
require.Fail("timeout receiving fingerprint")
}
}
func TestLXCDriver_Start_Wait(t *testing.T) {
if !testutil.IsTravis() {
t.Parallel()
}
requireLXC(t)
ctestutil.RequireRoot(t)
require := require.New(t)
// prepare test file
testFileContents := []byte("this should be visible under /mnt/tmp")
tmpFile, err := ioutil.TempFile("/tmp", "testlxcdriver_start_wait")
if err != nil {
t.Fatalf("error writing temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
if _, err := tmpFile.Write(testFileContents); err != nil {
t.Fatalf("error writing temp file: %v", err)
}
if err := tmpFile.Close(); err != nil {
t.Fatalf("error closing temp file: %v", err)
}
d := NewLXCDriver(testlog.HCLogger(t)).(*Driver)
d.config.Enabled = true
d.config.AllowVolumes = true
harness := drivers.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "test",
Resources: &drivers.Resources{
NomadResources: &structs.Resources{
CPU: 1,
MemoryMB: 2,
},
LinuxResources: &drivers.LinuxResources{
CPUShares: 1024,
MemoryLimitBytes: 2 * 1024,
},
},
}
taskConfig := map[string]interface{}{
"template": "/usr/share/lxc/templates/lxc-busybox",
"volumes": []string{"/tmp/:mnt/tmp"},
}
encodeDriverHelper(require, task, taskConfig)
cleanup := harness.MkAllocDir(task, false)
defer cleanup()
fmt.Println(task.AllocDir)
handle, _, err := harness.StartTask(task)
require.NoError(err)
require.NotNil(handle)
lxcHandle, ok := d.tasks.Get(task.ID)
require.True(ok)
container := lxcHandle.container
// Destroy container after test
defer func() {
container.Stop()
container.Destroy()
}()
// Test that container is running
testutil.WaitForResult(func() (bool, error) {
state := container.State()
if state == lxc.RUNNING {
return true, nil
}
return false, fmt.Errorf("container in state: %v", state)
}, func(err error) {
t.Fatalf("container failed to start: %v", err)
})
// Test that directories are mounted in their proper location
containerName := container.Name()
for _, mnt := range []string{"alloc", "local", "secrets", "mnt/tmp"} {
fullpath := filepath.Join(d.lxcPath(), containerName, "rootfs", mnt)
stat, err := os.Stat(fullpath)
require.NoError(err)
require.True(stat.IsDir())
}
// Test bind mount volumes exist in container:
mountedContents, err := exec.Command("lxc-attach",
"-n", containerName, "--",
"cat", filepath.Join("/mnt/", tmpFile.Name()),
).Output()
require.NoError(err)
require.Equal(string(testFileContents), string(mountedContents))
// Test that killing container marks container as stopped
require.NoError(container.Stop())
testutil.WaitForResult(func() (bool, error) {
status, err := d.InspectTask(task.ID)
if err == nil && status.State == drivers.TaskStateExited {
return true, nil
}
return false, fmt.Errorf("task in state: %v", status.State)
}, func(err error) {
t.Fatalf("task was not marked as stopped: %v", err)
})
}
func TestLXCDriver_Start_Stop(t *testing.T) {
if !testutil.IsTravis() {
t.Parallel()
}
requireLXC(t)
ctestutil.RequireRoot(t)
require := require.New(t)
d := NewLXCDriver(testlog.HCLogger(t)).(*Driver)
d.config.Enabled = true
d.config.AllowVolumes = true
harness := drivers.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "test",
Resources: &drivers.Resources{
NomadResources: &structs.Resources{
CPU: 1,
MemoryMB: 2,
},
LinuxResources: &drivers.LinuxResources{
CPUShares: 1024,
MemoryLimitBytes: 2 * 1024,
},
},
}
taskConfig := map[string]interface{}{
"template": "/usr/share/lxc/templates/lxc-busybox",
}
encodeDriverHelper(require, task, taskConfig)
cleanup := harness.MkAllocDir(task, false)
defer cleanup()
fmt.Println(task.AllocDir)
handle, _, err := harness.StartTask(task)
require.NoError(err)
require.NotNil(handle)
lxcHandle, ok := d.tasks.Get(task.ID)
require.True(ok)
container := lxcHandle.container
// Destroy container after test
defer func() {
container.Stop()
container.Destroy()
}()
// Test that container is running
testutil.WaitForResult(func() (bool, error) {
state := container.State()
if state == lxc.RUNNING {
return true, nil
}
return false, fmt.Errorf("container in state: %v", state)
}, func(err error) {
t.Fatalf("container failed to start: %v", err)
})
require.NoError(d.StopTask(task.ID, 5*time.Second, "kill"))
testutil.WaitForResult(func() (bool, error) {
status, err := d.InspectTask(task.ID)
if err == nil && status.State == drivers.TaskStateExited {
return true, nil
}
return false, fmt.Errorf("task in state: %v", status.State)
}, func(err error) {
t.Fatalf("task was not marked as stopped: %v", err)
})
}
func requireLXC(t *testing.T) {
if lxc.Version() == "" {
t.Skip("skipping, lxc not present")
}
}
func encodeDriverHelper(require *require.Assertions, task *drivers.TaskConfig, taskConfig map[string]interface{}) {
evalCtx := &hcl.EvalContext{
Functions: shared.GetStdlibFuncs(),
}
spec, diag := hclspec.Convert(taskConfigSpec)
require.False(diag.HasErrors())
taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx)
require.False(diag.HasErrors())
err := task.EncodeDriverConfig(taskConfigCtyVal)
require.Nil(err)
}

189
drivers/lxc/handle.go Normal file
View File

@@ -0,0 +1,189 @@
//+build linux,lxc
package lxc
import (
"fmt"
"strconv"
"strings"
"sync"
"time"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/stats"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/plugins/drivers"
lxc "gopkg.in/lxc/go-lxc.v2"
)
type taskHandle struct {
container *lxc.Container
initPid int
logger hclog.Logger
totalCpuStats *stats.CpuStats
userCpuStats *stats.CpuStats
systemCpuStats *stats.CpuStats
// stateLock syncs access to all fields below
stateLock sync.RWMutex
taskConfig *drivers.TaskConfig
procState drivers.TaskState
startedAt time.Time
completedAt time.Time
exitResult *drivers.ExitResult
}
var (
LXCMeasuredCpuStats = []string{"System Mode", "User Mode", "Percent"}
LXCMeasuredMemStats = []string{"RSS", "Cache", "Swap", "Max Usage", "Kernel Usage", "Kernel Max Usage"}
)
func (h *taskHandle) TaskStatus() *drivers.TaskStatus {
h.stateLock.RLock()
defer h.stateLock.RUnlock()
return &drivers.TaskStatus{
ID: h.taskConfig.ID,
Name: h.taskConfig.Name,
State: h.procState,
StartedAt: h.startedAt,
CompletedAt: h.completedAt,
ExitResult: h.exitResult,
DriverAttributes: map[string]string{
"pid": strconv.Itoa(h.initPid),
},
}
}
func (h *taskHandle) IsRunning() bool {
h.stateLock.RLock()
defer h.stateLock.RUnlock()
return h.procState == drivers.TaskStateRunning
}
func (h *taskHandle) run() {
h.stateLock.Lock()
if h.exitResult == nil {
h.exitResult = &drivers.ExitResult{}
}
h.stateLock.Unlock()
if ok, err := waitTillStopped(h.container); !ok {
h.logger.Error("failed to find container process", "error", err)
return
}
h.stateLock.Lock()
defer h.stateLock.Unlock()
h.procState = drivers.TaskStateExited
h.exitResult.ExitCode = 0
h.exitResult.Signal = 0
h.completedAt = time.Now()
// TODO: detect if the task OOMed
}
func (h *taskHandle) stats() (*cstructs.TaskResourceUsage, error) {
cpuStats, err := h.container.CPUStats()
if err != nil {
h.logger.Error("failed to get container cpu stats", "error", err)
return nil, nil
}
total, err := h.container.CPUTime()
if err != nil {
h.logger.Error("failed to get container cpu time", "error", err)
return nil, nil
}
t := time.Now()
// Get the cpu stats
system := cpuStats["system"]
user := cpuStats["user"]
cs := &cstructs.CpuStats{
SystemMode: h.systemCpuStats.Percent(float64(system)),
UserMode: h.systemCpuStats.Percent(float64(user)),
Percent: h.totalCpuStats.Percent(float64(total)),
TotalTicks: float64(user + system),
Measured: LXCMeasuredCpuStats,
}
// Get the Memory Stats
memData := map[string]uint64{
"rss": 0,
"cache": 0,
"swap": 0,
}
rawMemStats := h.container.CgroupItem("memory.stat")
for _, rawMemStat := range rawMemStats {
key, val, err := keysToVal(rawMemStat)
if err != nil {
h.logger.Error("failed to get stat", "line", rawMemStat, "error", err)
continue
}
if _, ok := memData[key]; ok {
memData[key] = val
}
}
ms := &cstructs.MemoryStats{
RSS: memData["rss"],
Cache: memData["cache"],
Swap: memData["swap"],
Measured: LXCMeasuredMemStats,
}
mu := h.container.CgroupItem("memory.max_usage_in_bytes")
for _, rawMemMaxUsage := range mu {
val, err := strconv.ParseUint(rawMemMaxUsage, 10, 64)
if err != nil {
h.logger.Error("failed to get max memory usage", "error", err)
continue
}
ms.MaxUsage = val
}
ku := h.container.CgroupItem("memory.kmem.usage_in_bytes")
for _, rawKernelUsage := range ku {
val, err := strconv.ParseUint(rawKernelUsage, 10, 64)
if err != nil {
h.logger.Error("failed to get kernel memory usage", "error", err)
continue
}
ms.KernelUsage = val
}
mku := h.container.CgroupItem("memory.kmem.max_usage_in_bytes")
for _, rawMaxKernelUsage := range mku {
val, err := strconv.ParseUint(rawMaxKernelUsage, 10, 64)
if err != nil {
h.logger.Error("failed tog get max kernel memory usage", "error", err)
continue
}
ms.KernelMaxUsage = val
}
taskResUsage := cstructs.TaskResourceUsage{
ResourceUsage: &cstructs.ResourceUsage{
CpuStats: cs,
MemoryStats: ms,
},
Timestamp: t.UTC().UnixNano(),
}
return &taskResUsage, nil
}
func keysToVal(line string) (string, uint64, error) {
tokens := strings.Split(line, " ")
if len(tokens) != 2 {
return "", 0, fmt.Errorf("line isn't a k/v pair")
}
key := tokens[0]
val, err := strconv.ParseUint(tokens[1], 10, 64)
return key, val, err
}

172
drivers/lxc/lxc.go Normal file
View File

@@ -0,0 +1,172 @@
//+build linux,lxc
package lxc
import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/plugins/drivers"
lxc "gopkg.in/lxc/go-lxc.v2"
)
var (
verbosityLevels = map[string]lxc.Verbosity{
"": lxc.Quiet,
"verbose": lxc.Verbose,
"quiet": lxc.Quiet,
}
logLevels = map[string]lxc.LogLevel{
"": lxc.ERROR,
"debug": lxc.DEBUG,
"error": lxc.ERROR,
"info": lxc.INFO,
"trace": lxc.TRACE,
"warn": lxc.WARN,
}
)
const (
// containerMonitorIntv is the interval at which the driver checks if the
// container is still alive
containerMonitorIntv = 2 * time.Second
)
func (d *Driver) lxcPath() string {
lxcPath := d.config.Path
if lxcPath == "" {
lxcPath = lxc.DefaultConfigPath()
}
return lxcPath
}
func (d *Driver) initializeContainer(cfg *drivers.TaskConfig, taskConfig TaskConfig) (*lxc.Container, error) {
lxcPath := d.lxcPath()
containerName := fmt.Sprintf("%s-%s", cfg.Name, uuid.Generate())
c, err := lxc.NewContainer(containerName, lxcPath)
if err != nil {
return nil, fmt.Errorf("failed to initialize container: %v", err)
}
if v, ok := verbosityLevels[taskConfig.Verbosity]; ok {
c.SetVerbosity(v)
} else {
return nil, fmt.Errorf("lxc driver config 'verbosity' can only be either quiet or verbose")
}
if v, ok := logLevels[taskConfig.LogLevel]; ok {
c.SetLogLevel(v)
} else {
return nil, fmt.Errorf("lxc driver config 'log_level' can only be trace, debug, info, warn or error")
}
logFile := filepath.Join(cfg.TaskDir().Dir, fmt.Sprintf("%v-lxc.log", cfg.Name))
c.SetLogFile(logFile)
return c, nil
}
func (d *Driver) configureContainerNetwork(c *lxc.Container) error {
// Set the network type to none
if err := c.SetConfigItem(networkTypeConfigKey(), "none"); err != nil {
return fmt.Errorf("error setting network type configuration: %v", err)
}
return nil
}
func networkTypeConfigKey() string {
if lxc.VersionAtLeast(2, 1, 0) {
return "lxc.net.0.type"
}
// prior to 2.1, network used
return "lxc.network.type"
}
func (d *Driver) mountVolumes(c *lxc.Container, cfg *drivers.TaskConfig, taskConfig TaskConfig) error {
// Bind mount the shared alloc dir and task local dir in the container
mounts := []string{
fmt.Sprintf("%s local none rw,bind,create=dir", cfg.TaskDir().LocalDir),
fmt.Sprintf("%s alloc none rw,bind,create=dir", cfg.TaskDir().SharedAllocDir),
fmt.Sprintf("%s secrets none rw,bind,create=dir", cfg.TaskDir().SecretsDir),
}
volumesEnabled := d.config.AllowVolumes
for _, volDesc := range taskConfig.Volumes {
// the format was checked in Validate()
paths := strings.Split(volDesc, ":")
if filepath.IsAbs(paths[0]) {
if !volumesEnabled {
return fmt.Errorf("absolute bind-mount volume in config but volumes are disabled")
}
} else {
// Relative source paths are treated as relative to alloc dir
paths[0] = filepath.Join(cfg.TaskDir().Dir, paths[0])
}
mounts = append(mounts, fmt.Sprintf("%s %s none rw,bind,create=dir", paths[0], paths[1]))
}
for _, mnt := range mounts {
if err := c.SetConfigItem("lxc.mount.entry", mnt); err != nil {
return fmt.Errorf("error setting bind mount %q error: %v", mnt, err)
}
}
return nil
}
func (d *Driver) setResourceLimits(c *lxc.Container, cfg *drivers.TaskConfig) error {
if err := c.SetMemoryLimit(lxc.ByteSize(cfg.Resources.NomadResources.MemoryMB) * lxc.MB); err != nil {
return fmt.Errorf("unable to set memory limits: %v", err)
}
if err := c.SetCgroupItem("cpu.shares", strconv.FormatInt(cfg.Resources.LinuxResources.CPUShares, 10)); err != nil {
return fmt.Errorf("unable to set cpu shares: %v", err)
}
return nil
}
func toLXCCreateOptions(taskConfig TaskConfig) lxc.TemplateOptions {
return lxc.TemplateOptions{
Template: taskConfig.Template,
Distro: taskConfig.Distro,
Release: taskConfig.Release,
Arch: taskConfig.Arch,
FlushCache: taskConfig.FlushCache,
DisableGPGValidation: taskConfig.DisableGPGValidation,
ExtraArgs: taskConfig.TemplateArgs,
}
}
// waitTillStopped blocks and returns true when container stops;
// returns false with an error message if the container processes cannot be identified.
//
// Use this in preference to c.Wait() - lxc Wait() function holds a write lock on the container
// blocking any other operation on container, including looking up container stats
func waitTillStopped(c *lxc.Container) (bool, error) {
ps, err := os.FindProcess(c.InitPid())
if err != nil {
return false, err
}
for {
if err := ps.Signal(syscall.Signal(0)); err != nil {
return true, nil
}
time.Sleep(containerMonitorIntv)
}
}

35
drivers/lxc/state.go Normal file
View File

@@ -0,0 +1,35 @@
//+build linux,lxc
package lxc
import (
"sync"
)
type taskStore struct {
store map[string]*taskHandle
lock sync.RWMutex
}
func newTaskStore() *taskStore {
return &taskStore{store: map[string]*taskHandle{}}
}
func (ts *taskStore) Set(id string, handle *taskHandle) {
ts.lock.Lock()
defer ts.lock.Unlock()
ts.store[id] = handle
}
func (ts *taskStore) Get(id string) (*taskHandle, bool) {
ts.lock.RLock()
defer ts.lock.RUnlock()
t, ok := ts.store[id]
return t, ok
}
func (ts *taskStore) Delete(id string) {
ts.lock.Lock()
defer ts.lock.Unlock()
delete(ts.store, id)
}

View File

@@ -0,0 +1,14 @@
//+build linux,lxc
package catalog
import (
"github.com/hashicorp/nomad/drivers/lxc"
)
// This file is where all builtin plugins should be registered in the catalog.
// Plugins with build restrictions should be placed in the appropriate
// register_XXX.go file.
func init() {
RegisterDeferredConfig(lxc.PluginID, lxc.PluginConfig, lxc.PluginLoader)
}