diff --git a/drivers/lxc/driver.go b/drivers/lxc/driver.go new file mode 100644 index 000000000..8291deab8 --- /dev/null +++ b/drivers/lxc/driver.go @@ -0,0 +1,510 @@ +//+build linux,lxc + +package lxc + +import ( + "context" + "fmt" + "strconv" + "time" + + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/stats" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/drivers/shared/eventer" + "github.com/hashicorp/nomad/plugins/base" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/plugins/shared/loader" + + lxc "gopkg.in/lxc/go-lxc.v2" +) + +const ( + // pluginName is the name of the plugin + pluginName = "lxc" + + // fingerprintPeriod is the interval at which the driver will send fingerprint responses + fingerprintPeriod = 30 * time.Second +) + +var ( + // PluginID is the rawexec plugin metadata registered in the plugin + // catalog. + PluginID = loader.PluginID{ + Name: pluginName, + PluginType: base.PluginTypeDriver, + } + + // PluginConfig is the rawexec factory function registered in the + // plugin catalog. + PluginConfig = &loader.InternalPluginConfig{ + Config: map[string]interface{}{}, + Factory: func(l hclog.Logger) interface{} { return NewLXCDriver(l) }, + } +) + +// PluginLoader maps pre-0.9 client driver options to post-0.9 plugin options. +func PluginLoader(opts map[string]string) (map[string]interface{}, error) { + conf := map[string]interface{}{} + if v, err := strconv.ParseBool(opts["driver.lxc.enable"]); err == nil { + conf["enabled"] = v + } + if v, err := strconv.ParseBool(opts["lxc.volumes.enabled"]); err == nil { + conf["volumes"] = v + } + if v, ok := opts["driver.lxc.path"]; ok { + conf["path"] = v + } + return conf, nil +} + +var ( + // pluginInfo is the response returned for the PluginInfo RPC + pluginInfo = &base.PluginInfoResponse{ + Type: base.PluginTypeDriver, + PluginApiVersion: "0.0.1", + PluginVersion: "0.1.0", + Name: pluginName, + } + + // configSpec is the hcl specification returned by the ConfigSchema RPC + configSpec = hclspec.NewObject(map[string]*hclspec.Spec{ + "enabled": hclspec.NewDefault( + hclspec.NewAttr("enabled", "bool", false), + hclspec.NewLiteral("true"), + ), + "volumes": hclspec.NewDefault( + hclspec.NewAttr("volumes", "bool", false), + hclspec.NewLiteral("true"), + ), + "path": hclspec.NewDefault( + hclspec.NewAttr("path", "string", false), + hclspec.NewLiteral("\"\""), + ), + }) + + // taskConfigSpec is the hcl specification for the driver config section of + // a task within a job. It is returned in the TaskConfigSchema RPC + taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{ + "template": hclspec.NewAttr("template", "string", true), + "distro": hclspec.NewAttr("distro", "string", false), + "release": hclspec.NewAttr("release", "string", false), + "arch": hclspec.NewAttr("arch", "string", false), + "image_variant": hclspec.NewAttr("image_variant", "string", false), + "image_server": hclspec.NewAttr("image_server", "string", false), + "gpg_key_id": hclspec.NewAttr("gpg_key_id", "string", false), + "gpg_key_server": hclspec.NewAttr("gpg_key_server", "string", false), + "disable_gpg": hclspec.NewAttr("disable_gpg", "string", false), + "flush_cache": hclspec.NewAttr("flush_cache", "string", false), + "force_cache": hclspec.NewAttr("force_cache", "string", false), + "template_args": hclspec.NewAttr("template_args", "list(string)", false), + "log_level": hclspec.NewAttr("log_level", "string", false), + "verbosity": hclspec.NewAttr("verbosity", "string", false), + "volumes": hclspec.NewAttr("volumes", "list(string)", false), + }) + + // capabilities is returned by the Capabilities RPC and indicates what + // optional features this driver supports + capabilities = &drivers.Capabilities{ + SendSignals: false, + Exec: false, + FSIsolation: cstructs.FSIsolationImage, + } +) + +// Driver is a privileged version of the exec driver. It provides no +// resource isolation and just fork/execs. The Exec driver should be preferred +// and this should only be used when explicitly needed. +type Driver struct { + // eventer is used to handle multiplexing of TaskEvents calls such that an + // event can be broadcast to all callers + eventer *eventer.Eventer + + // config is the driver configuration set by the SetConfig RPC + config *Config + + // nomadConfig is the client config from nomad + nomadConfig *base.ClientDriverConfig + + // tasks is the in memory datastore mapping taskIDs to rawExecDriverHandles + tasks *taskStore + + // ctx is the context for the driver. It is passed to other subsystems to + // coordinate shutdown + ctx context.Context + + // signalShutdown is called when the driver is shutting down and cancels the + // ctx passed to any subsystems + signalShutdown context.CancelFunc + + // logger will log to the Nomad agent + logger hclog.Logger +} + +// Config is the driver configuration set by the SetConfig RPC call +type Config struct { + // Enabled is set to true to enable the raw_exec driver + Enabled bool `codec:"enabled"` + + AllowVolumes bool `codec:"volumes"` + + Path string `codec:"path"` +} + +// TaskConfig is the driver configuration of a task within a job +type TaskConfig struct { + Template string `codec:"template"` + Distro string `codec:"distro"` + Release string `codec:"release"` + Arch string `codec:"arch"` + ImageVariant string `codec:"image_variant"` + ImageServer string `codec:"image_server"` + GPGKeyID string `codec:"gpg_key_id"` + GPGKeyServer string `codec:"gpg_key_server"` + DisableGPGValidation bool `codec:"disable_gpg"` + FlushCache bool `codec:"flush_cache"` + ForceCache bool `codec:"force_cache"` + TemplateArgs []string `codec:"template_args"` + LogLevel string `codec:"log_level"` + Verbosity string `codec:"verbosity"` + Volumes []string `codec:"volumes"` +} + +// TaskState is the state which is encoded in the handle returned in +// StartTask. This information is needed to rebuild the task state and handler +// during recovery. +type TaskState struct { + TaskConfig *drivers.TaskConfig + ContainerName string + StartedAt time.Time +} + +// NewLXCDriver returns a new DriverPlugin implementation +func NewLXCDriver(logger hclog.Logger) drivers.DriverPlugin { + ctx, cancel := context.WithCancel(context.Background()) + logger = logger.Named(pluginName) + return &Driver{ + eventer: eventer.NewEventer(ctx, logger), + config: &Config{}, + tasks: newTaskStore(), + ctx: ctx, + signalShutdown: cancel, + logger: logger, + } +} + +func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) { + return pluginInfo, nil +} + +func (d *Driver) ConfigSchema() (*hclspec.Spec, error) { + return configSpec, nil +} + +func (d *Driver) SetConfig(data []byte, cfg *base.ClientAgentConfig) error { + var config Config + if err := base.MsgPackDecode(data, &config); err != nil { + return err + } + + d.config = &config + if cfg != nil { + d.nomadConfig = cfg.Driver + } + + return nil +} + +func (d *Driver) Shutdown(ctx context.Context) error { + d.signalShutdown() + return nil +} + +func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) { + return taskConfigSpec, nil +} + +func (d *Driver) Capabilities() (*drivers.Capabilities, error) { + return capabilities, nil +} + +func (d *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) { + ch := make(chan *drivers.Fingerprint) + go d.handleFingerprint(ctx, ch) + return ch, nil +} + +func (d *Driver) handleFingerprint(ctx context.Context, ch chan<- *drivers.Fingerprint) { + defer close(ch) + ticker := time.NewTimer(0) + for { + select { + case <-ctx.Done(): + return + case <-d.ctx.Done(): + return + case <-ticker.C: + ticker.Reset(fingerprintPeriod) + ch <- d.buildFingerprint() + } + } +} + +func (d *Driver) buildFingerprint() *drivers.Fingerprint { + var health drivers.HealthState + var desc string + attrs := map[string]string{} + + lxcVersion := lxc.Version() + + if d.config.Enabled && lxcVersion != "" { + health = drivers.HealthStateHealthy + desc = "ready" + attrs["driver.lxc"] = "1" + attrs["driver.lxc.version"] = lxcVersion + } else { + health = drivers.HealthStateUndetected + desc = "disabled" + } + + if d.config.AllowVolumes { + attrs["driver.lxc.volumes.enabled"] = "1" + } + + return &drivers.Fingerprint{ + Attributes: attrs, + Health: health, + HealthDescription: desc, + } +} + +func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { + if handle == nil { + return fmt.Errorf("error: handle cannot be nil") + } + + if _, ok := d.tasks.Get(handle.Config.ID); ok { + return nil + } + + var taskState TaskState + if err := handle.GetDriverState(&taskState); err != nil { + return fmt.Errorf("failed to decode task state from handle: %v", err) + } + + var driverConfig TaskConfig + if err := taskState.TaskConfig.DecodeDriverConfig(&driverConfig); err != nil { + return fmt.Errorf("failed to decode driver config: %v", err) + } + + c, err := lxc.NewContainer(taskState.ContainerName, d.lxcPath()) + if err != nil { + return fmt.Errorf("failed to create container ref: %v", err) + } + + initPid := c.InitPid() + h := &taskHandle{ + container: c, + initPid: initPid, + taskConfig: taskState.TaskConfig, + procState: drivers.TaskStateRunning, + startedAt: taskState.StartedAt, + exitResult: &drivers.ExitResult{}, + + totalCpuStats: stats.NewCpuStats(), + userCpuStats: stats.NewCpuStats(), + systemCpuStats: stats.NewCpuStats(), + } + + d.tasks.Set(taskState.TaskConfig.ID, h) + + go h.run() + return nil +} + +func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstructs.DriverNetwork, error) { + if _, ok := d.tasks.Get(cfg.ID); ok { + return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID) + } + + var driverConfig TaskConfig + if err := cfg.DecodeDriverConfig(&driverConfig); err != nil { + return nil, nil, fmt.Errorf("failed to decode driver config: %v", err) + } + + d.logger.Info("starting lxc task", "driver_cfg", hclog.Fmt("%+v", driverConfig)) + handle := drivers.NewTaskHandle(pluginName) + handle.Config = cfg + + c, err := d.initializeContainer(cfg, driverConfig) + if err != nil { + return nil, nil, err + } + + opt := toLXCCreateOptions(driverConfig) + if err := c.Create(opt); err != nil { + return nil, nil, fmt.Errorf("unable to create container: %v", err) + } + + cleanup := func() { + if err := c.Destroy(); err != nil { + d.logger.Error("failed to clean up from an error in Start", "error", err) + } + } + + if err := d.configureContainerNetwork(c); err != nil { + cleanup() + return nil, nil, err + } + + if err := d.mountVolumes(c, cfg, driverConfig); err != nil { + cleanup() + return nil, nil, err + } + + if err := c.Start(); err != nil { + cleanup() + return nil, nil, fmt.Errorf("unable to start container: %v", err) + } + + if err := d.setResourceLimits(c, cfg); err != nil { + cleanup() + return nil, nil, err + } + + pid := c.InitPid() + + h := &taskHandle{ + container: c, + initPid: pid, + taskConfig: cfg, + procState: drivers.TaskStateRunning, + startedAt: time.Now().Round(time.Millisecond), + logger: d.logger, + + totalCpuStats: stats.NewCpuStats(), + userCpuStats: stats.NewCpuStats(), + systemCpuStats: stats.NewCpuStats(), + } + + driverState := TaskState{ + ContainerName: c.Name(), + TaskConfig: cfg, + StartedAt: h.startedAt, + } + + if err := handle.SetDriverState(&driverState); err != nil { + d.logger.Error("failed to start task, error setting driver state", "error", err) + cleanup() + return nil, nil, fmt.Errorf("failed to set driver state: %v", err) + } + + d.tasks.Set(cfg.ID, h) + + go h.run() + + return handle, nil, nil +} + +func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) { + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + ch := make(chan *drivers.ExitResult) + go d.handleWait(ctx, handle, ch) + + return ch, nil +} + +func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) { + defer close(ch) + + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-d.ctx.Done(): + return + case <-ticker.C: + s := handle.TaskStatus() + if s.State == drivers.TaskStateExited { + ch <- handle.exitResult + } + } + } +} + +func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error { + handle, ok := d.tasks.Get(taskID) + if !ok { + return drivers.ErrTaskNotFound + } + + if err := handle.container.Shutdown(timeout); err != nil { + if err := handle.container.Stop(); err != nil { + return fmt.Errorf("executor Shutdown failed: %v", err) + } + } + + return nil +} + +func (d *Driver) DestroyTask(taskID string, force bool) error { + handle, ok := d.tasks.Get(taskID) + if !ok { + return drivers.ErrTaskNotFound + } + + if handle.IsRunning() && !force { + return fmt.Errorf("cannot destroy running task") + } + + if handle.IsRunning() { + if err := handle.container.Shutdown(0); err != nil { + handle.logger.Error("failed to destroy executor", "err", err) + } + } + + d.tasks.Delete(taskID) + return nil +} + +func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) { + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + return handle.TaskStatus(), nil +} + +func (d *Driver) TaskStats(taskID string) (*cstructs.TaskResourceUsage, error) { + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + return handle.stats() +} + +func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) { + return d.eventer.TaskEvents(ctx) +} + +func (d *Driver) SignalTask(taskID string, signal string) error { + return fmt.Errorf("LXC driver does not support signals") +} + +func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) { + if len(cmd) == 0 { + return nil, fmt.Errorf("error cmd must have atleast one value") + } + + return nil, fmt.Errorf("LXC driver does not support exec") +} diff --git a/drivers/lxc/driver_test.go b/drivers/lxc/driver_test.go new file mode 100644 index 000000000..4889242d3 --- /dev/null +++ b/drivers/lxc/driver_test.go @@ -0,0 +1,273 @@ +// +build linux,lxc + +package lxc + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "testing" + "time" + + "github.com/hashicorp/hcl2/hcl" + ctestutil "github.com/hashicorp/nomad/client/testutil" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/hashicorp/nomad/plugins/shared" + "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" + lxc "gopkg.in/lxc/go-lxc.v2" +) + +func TestLXCDriver_Fingerprint(t *testing.T) { + t.Parallel() + requireLXC(t) + + require := require.New(t) + + d := NewLXCDriver(testlog.HCLogger(t)).(*Driver) + d.config.Enabled = true + harness := drivers.NewDriverHarness(t, d) + + fingerCh, err := harness.Fingerprint(context.Background()) + require.NoError(err) + select { + case finger := <-fingerCh: + require.Equal(drivers.HealthStateHealthy, finger.Health) + require.Equal("1", finger.Attributes["driver.lxc"]) + require.NotEmpty(finger.Attributes["driver.lxc.version"]) + case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second): + require.Fail("timeout receiving fingerprint") + } +} + +func TestLXCDriver_FingerprintNotEnabled(t *testing.T) { + t.Parallel() + requireLXC(t) + + require := require.New(t) + + d := NewLXCDriver(testlog.HCLogger(t)).(*Driver) + d.config.Enabled = false + harness := drivers.NewDriverHarness(t, d) + + fingerCh, err := harness.Fingerprint(context.Background()) + require.NoError(err) + select { + case finger := <-fingerCh: + require.Equal(drivers.HealthStateUndetected, finger.Health) + require.Equal("", finger.Attributes["driver.lxc"]) + require.Empty(finger.Attributes["driver.lxc.version"]) + case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second): + require.Fail("timeout receiving fingerprint") + } +} + +func TestLXCDriver_Start_Wait(t *testing.T) { + if !testutil.IsTravis() { + t.Parallel() + } + requireLXC(t) + ctestutil.RequireRoot(t) + + require := require.New(t) + + // prepare test file + testFileContents := []byte("this should be visible under /mnt/tmp") + tmpFile, err := ioutil.TempFile("/tmp", "testlxcdriver_start_wait") + if err != nil { + t.Fatalf("error writing temp file: %v", err) + } + defer os.Remove(tmpFile.Name()) + if _, err := tmpFile.Write(testFileContents); err != nil { + t.Fatalf("error writing temp file: %v", err) + } + if err := tmpFile.Close(); err != nil { + t.Fatalf("error closing temp file: %v", err) + } + + d := NewLXCDriver(testlog.HCLogger(t)).(*Driver) + d.config.Enabled = true + d.config.AllowVolumes = true + + harness := drivers.NewDriverHarness(t, d) + task := &drivers.TaskConfig{ + ID: uuid.Generate(), + Name: "test", + Resources: &drivers.Resources{ + NomadResources: &structs.Resources{ + CPU: 1, + MemoryMB: 2, + }, + LinuxResources: &drivers.LinuxResources{ + CPUShares: 1024, + MemoryLimitBytes: 2 * 1024, + }, + }, + } + taskConfig := map[string]interface{}{ + "template": "/usr/share/lxc/templates/lxc-busybox", + "volumes": []string{"/tmp/:mnt/tmp"}, + } + encodeDriverHelper(require, task, taskConfig) + + cleanup := harness.MkAllocDir(task, false) + defer cleanup() + fmt.Println(task.AllocDir) + + handle, _, err := harness.StartTask(task) + require.NoError(err) + require.NotNil(handle) + + lxcHandle, ok := d.tasks.Get(task.ID) + require.True(ok) + + container := lxcHandle.container + + // Destroy container after test + defer func() { + container.Stop() + container.Destroy() + }() + + // Test that container is running + testutil.WaitForResult(func() (bool, error) { + state := container.State() + if state == lxc.RUNNING { + return true, nil + } + return false, fmt.Errorf("container in state: %v", state) + }, func(err error) { + t.Fatalf("container failed to start: %v", err) + }) + + // Test that directories are mounted in their proper location + containerName := container.Name() + for _, mnt := range []string{"alloc", "local", "secrets", "mnt/tmp"} { + fullpath := filepath.Join(d.lxcPath(), containerName, "rootfs", mnt) + stat, err := os.Stat(fullpath) + require.NoError(err) + require.True(stat.IsDir()) + } + + // Test bind mount volumes exist in container: + mountedContents, err := exec.Command("lxc-attach", + "-n", containerName, "--", + "cat", filepath.Join("/mnt/", tmpFile.Name()), + ).Output() + require.NoError(err) + require.Equal(string(testFileContents), string(mountedContents)) + + // Test that killing container marks container as stopped + require.NoError(container.Stop()) + + testutil.WaitForResult(func() (bool, error) { + status, err := d.InspectTask(task.ID) + if err == nil && status.State == drivers.TaskStateExited { + return true, nil + } + return false, fmt.Errorf("task in state: %v", status.State) + }, func(err error) { + t.Fatalf("task was not marked as stopped: %v", err) + }) +} + +func TestLXCDriver_Start_Stop(t *testing.T) { + if !testutil.IsTravis() { + t.Parallel() + } + requireLXC(t) + ctestutil.RequireRoot(t) + + require := require.New(t) + + d := NewLXCDriver(testlog.HCLogger(t)).(*Driver) + d.config.Enabled = true + d.config.AllowVolumes = true + + harness := drivers.NewDriverHarness(t, d) + task := &drivers.TaskConfig{ + ID: uuid.Generate(), + Name: "test", + Resources: &drivers.Resources{ + NomadResources: &structs.Resources{ + CPU: 1, + MemoryMB: 2, + }, + LinuxResources: &drivers.LinuxResources{ + CPUShares: 1024, + MemoryLimitBytes: 2 * 1024, + }, + }, + } + taskConfig := map[string]interface{}{ + "template": "/usr/share/lxc/templates/lxc-busybox", + } + encodeDriverHelper(require, task, taskConfig) + + cleanup := harness.MkAllocDir(task, false) + defer cleanup() + fmt.Println(task.AllocDir) + + handle, _, err := harness.StartTask(task) + require.NoError(err) + require.NotNil(handle) + + lxcHandle, ok := d.tasks.Get(task.ID) + require.True(ok) + + container := lxcHandle.container + + // Destroy container after test + defer func() { + container.Stop() + container.Destroy() + }() + + // Test that container is running + testutil.WaitForResult(func() (bool, error) { + state := container.State() + if state == lxc.RUNNING { + return true, nil + } + return false, fmt.Errorf("container in state: %v", state) + }, func(err error) { + t.Fatalf("container failed to start: %v", err) + }) + + require.NoError(d.StopTask(task.ID, 5*time.Second, "kill")) + + testutil.WaitForResult(func() (bool, error) { + status, err := d.InspectTask(task.ID) + if err == nil && status.State == drivers.TaskStateExited { + return true, nil + } + return false, fmt.Errorf("task in state: %v", status.State) + }, func(err error) { + t.Fatalf("task was not marked as stopped: %v", err) + }) +} + +func requireLXC(t *testing.T) { + if lxc.Version() == "" { + t.Skip("skipping, lxc not present") + } +} + +func encodeDriverHelper(require *require.Assertions, task *drivers.TaskConfig, taskConfig map[string]interface{}) { + evalCtx := &hcl.EvalContext{ + Functions: shared.GetStdlibFuncs(), + } + spec, diag := hclspec.Convert(taskConfigSpec) + require.False(diag.HasErrors()) + taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx) + require.False(diag.HasErrors()) + err := task.EncodeDriverConfig(taskConfigCtyVal) + require.Nil(err) +} diff --git a/drivers/lxc/handle.go b/drivers/lxc/handle.go new file mode 100644 index 000000000..5d5799d2f --- /dev/null +++ b/drivers/lxc/handle.go @@ -0,0 +1,189 @@ +//+build linux,lxc + +package lxc + +import ( + "fmt" + "strconv" + "strings" + "sync" + "time" + + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/stats" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/plugins/drivers" + lxc "gopkg.in/lxc/go-lxc.v2" +) + +type taskHandle struct { + container *lxc.Container + initPid int + logger hclog.Logger + + totalCpuStats *stats.CpuStats + userCpuStats *stats.CpuStats + systemCpuStats *stats.CpuStats + + // stateLock syncs access to all fields below + stateLock sync.RWMutex + + taskConfig *drivers.TaskConfig + procState drivers.TaskState + startedAt time.Time + completedAt time.Time + exitResult *drivers.ExitResult +} + +var ( + LXCMeasuredCpuStats = []string{"System Mode", "User Mode", "Percent"} + + LXCMeasuredMemStats = []string{"RSS", "Cache", "Swap", "Max Usage", "Kernel Usage", "Kernel Max Usage"} +) + +func (h *taskHandle) TaskStatus() *drivers.TaskStatus { + h.stateLock.RLock() + defer h.stateLock.RUnlock() + + return &drivers.TaskStatus{ + ID: h.taskConfig.ID, + Name: h.taskConfig.Name, + State: h.procState, + StartedAt: h.startedAt, + CompletedAt: h.completedAt, + ExitResult: h.exitResult, + DriverAttributes: map[string]string{ + "pid": strconv.Itoa(h.initPid), + }, + } +} + +func (h *taskHandle) IsRunning() bool { + h.stateLock.RLock() + defer h.stateLock.RUnlock() + return h.procState == drivers.TaskStateRunning +} + +func (h *taskHandle) run() { + h.stateLock.Lock() + if h.exitResult == nil { + h.exitResult = &drivers.ExitResult{} + } + h.stateLock.Unlock() + + if ok, err := waitTillStopped(h.container); !ok { + h.logger.Error("failed to find container process", "error", err) + return + } + + h.stateLock.Lock() + defer h.stateLock.Unlock() + + h.procState = drivers.TaskStateExited + h.exitResult.ExitCode = 0 + h.exitResult.Signal = 0 + h.completedAt = time.Now() + + // TODO: detect if the task OOMed +} + +func (h *taskHandle) stats() (*cstructs.TaskResourceUsage, error) { + cpuStats, err := h.container.CPUStats() + if err != nil { + h.logger.Error("failed to get container cpu stats", "error", err) + return nil, nil + } + total, err := h.container.CPUTime() + if err != nil { + h.logger.Error("failed to get container cpu time", "error", err) + return nil, nil + } + + t := time.Now() + + // Get the cpu stats + system := cpuStats["system"] + user := cpuStats["user"] + cs := &cstructs.CpuStats{ + SystemMode: h.systemCpuStats.Percent(float64(system)), + UserMode: h.systemCpuStats.Percent(float64(user)), + Percent: h.totalCpuStats.Percent(float64(total)), + TotalTicks: float64(user + system), + Measured: LXCMeasuredCpuStats, + } + + // Get the Memory Stats + memData := map[string]uint64{ + "rss": 0, + "cache": 0, + "swap": 0, + } + rawMemStats := h.container.CgroupItem("memory.stat") + for _, rawMemStat := range rawMemStats { + key, val, err := keysToVal(rawMemStat) + if err != nil { + h.logger.Error("failed to get stat", "line", rawMemStat, "error", err) + continue + } + if _, ok := memData[key]; ok { + memData[key] = val + + } + } + ms := &cstructs.MemoryStats{ + RSS: memData["rss"], + Cache: memData["cache"], + Swap: memData["swap"], + Measured: LXCMeasuredMemStats, + } + + mu := h.container.CgroupItem("memory.max_usage_in_bytes") + for _, rawMemMaxUsage := range mu { + val, err := strconv.ParseUint(rawMemMaxUsage, 10, 64) + if err != nil { + h.logger.Error("failed to get max memory usage", "error", err) + continue + } + ms.MaxUsage = val + } + ku := h.container.CgroupItem("memory.kmem.usage_in_bytes") + for _, rawKernelUsage := range ku { + val, err := strconv.ParseUint(rawKernelUsage, 10, 64) + if err != nil { + h.logger.Error("failed to get kernel memory usage", "error", err) + continue + } + ms.KernelUsage = val + } + + mku := h.container.CgroupItem("memory.kmem.max_usage_in_bytes") + for _, rawMaxKernelUsage := range mku { + val, err := strconv.ParseUint(rawMaxKernelUsage, 10, 64) + if err != nil { + h.logger.Error("failed tog get max kernel memory usage", "error", err) + continue + } + ms.KernelMaxUsage = val + } + + taskResUsage := cstructs.TaskResourceUsage{ + ResourceUsage: &cstructs.ResourceUsage{ + CpuStats: cs, + MemoryStats: ms, + }, + Timestamp: t.UTC().UnixNano(), + } + + return &taskResUsage, nil + +} + +func keysToVal(line string) (string, uint64, error) { + tokens := strings.Split(line, " ") + if len(tokens) != 2 { + return "", 0, fmt.Errorf("line isn't a k/v pair") + } + key := tokens[0] + val, err := strconv.ParseUint(tokens[1], 10, 64) + return key, val, err +} diff --git a/drivers/lxc/lxc.go b/drivers/lxc/lxc.go new file mode 100644 index 000000000..f53fa14a2 --- /dev/null +++ b/drivers/lxc/lxc.go @@ -0,0 +1,172 @@ +//+build linux,lxc + +package lxc + +import ( + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + "syscall" + "time" + + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/plugins/drivers" + lxc "gopkg.in/lxc/go-lxc.v2" +) + +var ( + verbosityLevels = map[string]lxc.Verbosity{ + "": lxc.Quiet, + "verbose": lxc.Verbose, + "quiet": lxc.Quiet, + } + + logLevels = map[string]lxc.LogLevel{ + "": lxc.ERROR, + "debug": lxc.DEBUG, + "error": lxc.ERROR, + "info": lxc.INFO, + "trace": lxc.TRACE, + "warn": lxc.WARN, + } +) + +const ( + // containerMonitorIntv is the interval at which the driver checks if the + // container is still alive + containerMonitorIntv = 2 * time.Second +) + +func (d *Driver) lxcPath() string { + lxcPath := d.config.Path + if lxcPath == "" { + lxcPath = lxc.DefaultConfigPath() + } + return lxcPath + +} +func (d *Driver) initializeContainer(cfg *drivers.TaskConfig, taskConfig TaskConfig) (*lxc.Container, error) { + lxcPath := d.lxcPath() + + containerName := fmt.Sprintf("%s-%s", cfg.Name, uuid.Generate()) + c, err := lxc.NewContainer(containerName, lxcPath) + if err != nil { + return nil, fmt.Errorf("failed to initialize container: %v", err) + } + + if v, ok := verbosityLevels[taskConfig.Verbosity]; ok { + c.SetVerbosity(v) + } else { + return nil, fmt.Errorf("lxc driver config 'verbosity' can only be either quiet or verbose") + } + + if v, ok := logLevels[taskConfig.LogLevel]; ok { + c.SetLogLevel(v) + } else { + return nil, fmt.Errorf("lxc driver config 'log_level' can only be trace, debug, info, warn or error") + } + + logFile := filepath.Join(cfg.TaskDir().Dir, fmt.Sprintf("%v-lxc.log", cfg.Name)) + c.SetLogFile(logFile) + + return c, nil +} + +func (d *Driver) configureContainerNetwork(c *lxc.Container) error { + // Set the network type to none + if err := c.SetConfigItem(networkTypeConfigKey(), "none"); err != nil { + return fmt.Errorf("error setting network type configuration: %v", err) + } + return nil +} + +func networkTypeConfigKey() string { + if lxc.VersionAtLeast(2, 1, 0) { + return "lxc.net.0.type" + } + + // prior to 2.1, network used + return "lxc.network.type" +} + +func (d *Driver) mountVolumes(c *lxc.Container, cfg *drivers.TaskConfig, taskConfig TaskConfig) error { + // Bind mount the shared alloc dir and task local dir in the container + mounts := []string{ + fmt.Sprintf("%s local none rw,bind,create=dir", cfg.TaskDir().LocalDir), + fmt.Sprintf("%s alloc none rw,bind,create=dir", cfg.TaskDir().SharedAllocDir), + fmt.Sprintf("%s secrets none rw,bind,create=dir", cfg.TaskDir().SecretsDir), + } + + volumesEnabled := d.config.AllowVolumes + + for _, volDesc := range taskConfig.Volumes { + // the format was checked in Validate() + paths := strings.Split(volDesc, ":") + + if filepath.IsAbs(paths[0]) { + if !volumesEnabled { + return fmt.Errorf("absolute bind-mount volume in config but volumes are disabled") + } + } else { + // Relative source paths are treated as relative to alloc dir + paths[0] = filepath.Join(cfg.TaskDir().Dir, paths[0]) + } + + mounts = append(mounts, fmt.Sprintf("%s %s none rw,bind,create=dir", paths[0], paths[1])) + } + + for _, mnt := range mounts { + if err := c.SetConfigItem("lxc.mount.entry", mnt); err != nil { + return fmt.Errorf("error setting bind mount %q error: %v", mnt, err) + } + } + + return nil +} + +func (d *Driver) setResourceLimits(c *lxc.Container, cfg *drivers.TaskConfig) error { + if err := c.SetMemoryLimit(lxc.ByteSize(cfg.Resources.NomadResources.MemoryMB) * lxc.MB); err != nil { + return fmt.Errorf("unable to set memory limits: %v", err) + } + + if err := c.SetCgroupItem("cpu.shares", strconv.FormatInt(cfg.Resources.LinuxResources.CPUShares, 10)); err != nil { + return fmt.Errorf("unable to set cpu shares: %v", err) + } + + return nil +} + +func toLXCCreateOptions(taskConfig TaskConfig) lxc.TemplateOptions { + return lxc.TemplateOptions{ + Template: taskConfig.Template, + Distro: taskConfig.Distro, + Release: taskConfig.Release, + Arch: taskConfig.Arch, + FlushCache: taskConfig.FlushCache, + DisableGPGValidation: taskConfig.DisableGPGValidation, + ExtraArgs: taskConfig.TemplateArgs, + } +} + +// waitTillStopped blocks and returns true when container stops; +// returns false with an error message if the container processes cannot be identified. +// +// Use this in preference to c.Wait() - lxc Wait() function holds a write lock on the container +// blocking any other operation on container, including looking up container stats +func waitTillStopped(c *lxc.Container) (bool, error) { + ps, err := os.FindProcess(c.InitPid()) + if err != nil { + return false, err + } + + for { + if err := ps.Signal(syscall.Signal(0)); err != nil { + return true, nil + } + + time.Sleep(containerMonitorIntv) + + } +} diff --git a/drivers/lxc/state.go b/drivers/lxc/state.go new file mode 100644 index 000000000..c316a6cc6 --- /dev/null +++ b/drivers/lxc/state.go @@ -0,0 +1,35 @@ +//+build linux,lxc + +package lxc + +import ( + "sync" +) + +type taskStore struct { + store map[string]*taskHandle + lock sync.RWMutex +} + +func newTaskStore() *taskStore { + return &taskStore{store: map[string]*taskHandle{}} +} + +func (ts *taskStore) Set(id string, handle *taskHandle) { + ts.lock.Lock() + defer ts.lock.Unlock() + ts.store[id] = handle +} + +func (ts *taskStore) Get(id string) (*taskHandle, bool) { + ts.lock.RLock() + defer ts.lock.RUnlock() + t, ok := ts.store[id] + return t, ok +} + +func (ts *taskStore) Delete(id string) { + ts.lock.Lock() + defer ts.lock.Unlock() + delete(ts.store, id) +} diff --git a/plugins/shared/catalog/register_lxc.go b/plugins/shared/catalog/register_lxc.go new file mode 100644 index 000000000..6a029ec3f --- /dev/null +++ b/plugins/shared/catalog/register_lxc.go @@ -0,0 +1,14 @@ +//+build linux,lxc + +package catalog + +import ( + "github.com/hashicorp/nomad/drivers/lxc" +) + +// This file is where all builtin plugins should be registered in the catalog. +// Plugins with build restrictions should be placed in the appropriate +// register_XXX.go file. +func init() { + RegisterDeferredConfig(lxc.PluginID, lxc.PluginConfig, lxc.PluginLoader) +}