mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 18:35:44 +03:00
Implement java driver
This commit is contained in:
564
drivers/java/driver.go
Normal file
564
drivers/java/driver.go
Normal file
@@ -0,0 +1,564 @@
|
||||
package java
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"strconv"
|
||||
|
||||
"github.com/hashicorp/consul-template/signals"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-plugin"
|
||||
"github.com/hashicorp/nomad/client/driver/executor"
|
||||
dstructs "github.com/hashicorp/nomad/client/driver/structs"
|
||||
"github.com/hashicorp/nomad/client/fingerprint"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/drivers/shared/eventer"
|
||||
"github.com/hashicorp/nomad/plugins/base"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
"github.com/hashicorp/nomad/plugins/drivers/utils"
|
||||
"github.com/hashicorp/nomad/plugins/shared/hclspec"
|
||||
"github.com/hashicorp/nomad/plugins/shared/loader"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
const (
|
||||
// pluginName is the name of the plugin
|
||||
pluginName = "java"
|
||||
|
||||
// fingerprintPeriod is the interval at which the driver will send fingerprint responses
|
||||
fingerprintPeriod = 30 * time.Second
|
||||
|
||||
// The key populated in Node Attributes to indicate presence of the Java driver
|
||||
driverAttr = "driver.java"
|
||||
driverVersionAttr = "driver.java.version"
|
||||
)
|
||||
|
||||
var (
|
||||
// PluginID is the java plugin metadata registered in the plugin
|
||||
// catalog.
|
||||
PluginID = loader.PluginID{
|
||||
Name: pluginName,
|
||||
PluginType: base.PluginTypeDriver,
|
||||
}
|
||||
|
||||
// PluginConfig is the java driver factory function registered in the
|
||||
// plugin catalog.
|
||||
PluginConfig = &loader.InternalPluginConfig{
|
||||
Config: map[string]interface{}{},
|
||||
Factory: func(l hclog.Logger) interface{} { return NewDriver(l) },
|
||||
}
|
||||
|
||||
// pluginInfo is the response returned for the PluginInfo RPC
|
||||
pluginInfo = &base.PluginInfoResponse{
|
||||
Type: base.PluginTypeDriver,
|
||||
PluginApiVersion: "0.0.1",
|
||||
PluginVersion: "0.1.0",
|
||||
Name: pluginName,
|
||||
}
|
||||
|
||||
// configSpec is the hcl specification returned by the ConfigSchema RPC
|
||||
configSpec = hclspec.NewObject(map[string]*hclspec.Spec{})
|
||||
|
||||
// taskConfigSpec is the hcl specification for the driver config section of
|
||||
// a taskConfig within a job. It is returned in the TaskConfigSchema RPC
|
||||
taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{
|
||||
// It's required for either `class` or `jar_path` to be set,
|
||||
// but that's not expressable in hclspec. Marking both as optional
|
||||
// and setting checking explicitly later
|
||||
"class": hclspec.NewAttr("class", "string", false),
|
||||
"classpath": hclspec.NewAttr("classpath", "string", false),
|
||||
"jar_path": hclspec.NewAttr("jar_path", "string", false),
|
||||
"java_opts": hclspec.NewAttr("java_opts", "list(string)", false),
|
||||
"args": hclspec.NewAttr("args", "list(string)", false),
|
||||
})
|
||||
|
||||
// capabilities is returned by the Capabilities RPC and indicates what
|
||||
// optional features this driver supports
|
||||
capabilities = &drivers.Capabilities{
|
||||
SendSignals: false,
|
||||
Exec: false,
|
||||
FSIsolation: cstructs.FSIsolationNone,
|
||||
}
|
||||
|
||||
_ drivers.DriverPlugin = (*Driver)(nil)
|
||||
)
|
||||
|
||||
func init() {
|
||||
if runtime.GOOS == "linux" {
|
||||
capabilities.FSIsolation = cstructs.FSIsolationChroot
|
||||
}
|
||||
}
|
||||
|
||||
// TaskConfig is the driver configuration of a taskConfig within a job
|
||||
type TaskConfig struct {
|
||||
Class string `codec:"class"`
|
||||
ClassPath string `codec:"classpath"`
|
||||
JarPath string `codec:"jar_path"`
|
||||
JvmOpts []string `codec:"java_opts"`
|
||||
Args []string `codec:"args"` // extra arguments to java executable
|
||||
}
|
||||
|
||||
// TaskState is the state which is encoded in the handle returned in
|
||||
// StartTask. This information is needed to rebuild the taskConfig state and handler
|
||||
// during recovery.
|
||||
type TaskState struct {
|
||||
ReattachConfig *utils.ReattachConfig
|
||||
TaskConfig *drivers.TaskConfig
|
||||
Pid int
|
||||
StartedAt time.Time
|
||||
}
|
||||
|
||||
// Driver is a driver for running images via Java
|
||||
type Driver struct {
|
||||
// eventer is used to handle multiplexing of TaskEvents calls such that an
|
||||
// event can be broadcast to all callers
|
||||
eventer *eventer.Eventer
|
||||
|
||||
// tasks is the in memory datastore mapping taskIDs to taskHandle
|
||||
tasks *taskStore
|
||||
|
||||
// ctx is the context for the driver. It is passed to other subsystems to
|
||||
// coordinate shutdown
|
||||
ctx context.Context
|
||||
|
||||
// nomadConf is the client agent's configuration
|
||||
nomadConfig *base.ClientDriverConfig
|
||||
|
||||
// signalShutdown is called when the driver is shutting down and cancels the
|
||||
// ctx passed to any subsystems
|
||||
signalShutdown context.CancelFunc
|
||||
|
||||
// logger will log to the plugin output which is usually an 'executor.out'
|
||||
// file located in the root of the TaskDir
|
||||
logger hclog.Logger
|
||||
}
|
||||
|
||||
func NewDriver(logger hclog.Logger) drivers.DriverPlugin {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
logger = logger.Named(pluginName)
|
||||
return &Driver{
|
||||
eventer: eventer.NewEventer(ctx, logger),
|
||||
tasks: newTaskStore(),
|
||||
ctx: ctx,
|
||||
signalShutdown: cancel,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) {
|
||||
return pluginInfo, nil
|
||||
}
|
||||
|
||||
func (d *Driver) ConfigSchema() (*hclspec.Spec, error) {
|
||||
return configSpec, nil
|
||||
}
|
||||
|
||||
func (d *Driver) SetConfig(_ []byte, cfg *base.ClientAgentConfig) error {
|
||||
if cfg != nil {
|
||||
d.nomadConfig = cfg.Driver
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) {
|
||||
return taskConfigSpec, nil
|
||||
}
|
||||
|
||||
func (d *Driver) Capabilities() (*drivers.Capabilities, error) {
|
||||
return capabilities, nil
|
||||
}
|
||||
|
||||
func (r *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) {
|
||||
ch := make(chan *drivers.Fingerprint)
|
||||
go r.handleFingerprint(ctx, ch)
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
func (d *Driver) handleFingerprint(ctx context.Context, ch chan *drivers.Fingerprint) {
|
||||
ticker := time.NewTimer(0)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-d.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
ticker.Reset(fingerprintPeriod)
|
||||
ch <- d.buildFingerprint()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Driver) buildFingerprint() *drivers.Fingerprint {
|
||||
fp := &drivers.Fingerprint{
|
||||
Attributes: map[string]string{},
|
||||
Health: drivers.HealthStateHealthy,
|
||||
HealthDescription: "healthy",
|
||||
}
|
||||
|
||||
if runtime.GOOS == "linux" {
|
||||
// Only enable if w are root and cgroups are mounted when running on linux system
|
||||
if syscall.Geteuid() != 0 {
|
||||
fp.Health = drivers.HealthStateUnhealthy
|
||||
fp.HealthDescription = "java driver must run as root"
|
||||
return fp
|
||||
}
|
||||
|
||||
mount, err := fingerprint.FindCgroupMountpointDir()
|
||||
if err != nil {
|
||||
fp.Health = drivers.HealthStateUnhealthy
|
||||
fp.HealthDescription = "failed to discover cgroup mount point"
|
||||
d.logger.Warn(fp.HealthDescription, "error", err)
|
||||
return fp
|
||||
}
|
||||
|
||||
if mount == "" {
|
||||
fp.Health = drivers.HealthStateUnhealthy
|
||||
fp.HealthDescription = "cgroups are unavailable"
|
||||
return fp
|
||||
}
|
||||
}
|
||||
|
||||
version, runtime, vm, err := javaVersionInfo()
|
||||
if err != nil {
|
||||
// return no error, as it isn't an error to not find java, it just means we
|
||||
// can't use it.
|
||||
fp.Health = drivers.HealthStateUndetected
|
||||
fp.HealthDescription = ""
|
||||
return fp
|
||||
}
|
||||
|
||||
fp.Attributes[driverAttr] = "1"
|
||||
fp.Attributes[driverVersionAttr] = version
|
||||
fp.Attributes["driver.java.runtime"] = runtime
|
||||
fp.Attributes["driver.java.vm"] = vm
|
||||
|
||||
return fp
|
||||
}
|
||||
|
||||
func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
|
||||
if handle == nil {
|
||||
return fmt.Errorf("error: handle cannot be nil")
|
||||
}
|
||||
|
||||
var taskState TaskState
|
||||
if err := handle.GetDriverState(&taskState); err != nil {
|
||||
d.logger.Error("failed to decode taskConfig state from handle", "error", err, "task_id", handle.Config.ID)
|
||||
return fmt.Errorf("failed to decode taskConfig state from handle: %v", err)
|
||||
}
|
||||
|
||||
plugRC, err := utils.ReattachConfigToGoPlugin(taskState.ReattachConfig)
|
||||
if err != nil {
|
||||
d.logger.Error("failed to build ReattachConfig from taskConfig state", "error", err, "task_id", handle.Config.ID)
|
||||
return fmt.Errorf("failed to build ReattachConfig from taskConfig state: %v", err)
|
||||
}
|
||||
|
||||
pluginConfig := &plugin.ClientConfig{
|
||||
Reattach: plugRC,
|
||||
}
|
||||
|
||||
execImpl, pluginClient, err := utils.CreateExecutorWithConfig(pluginConfig, os.Stderr)
|
||||
if err != nil {
|
||||
d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)
|
||||
return fmt.Errorf("failed to reattach to executor: %v", err)
|
||||
}
|
||||
|
||||
h := &taskHandle{
|
||||
exec: execImpl,
|
||||
pid: taskState.Pid,
|
||||
pluginClient: pluginClient,
|
||||
taskConfig: taskState.TaskConfig,
|
||||
procState: drivers.TaskStateRunning,
|
||||
startedAt: taskState.StartedAt,
|
||||
exitResult: &drivers.ExitResult{},
|
||||
}
|
||||
|
||||
d.tasks.Set(taskState.TaskConfig.ID, h)
|
||||
|
||||
go h.run()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstructs.DriverNetwork, error) {
|
||||
if _, ok := d.tasks.Get(cfg.ID); ok {
|
||||
return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID)
|
||||
}
|
||||
|
||||
var driverConfig TaskConfig
|
||||
if err := cfg.DecodeDriverConfig(&driverConfig); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to decode driver config: %v", err)
|
||||
}
|
||||
|
||||
if driverConfig.Class == "" && driverConfig.JarPath == "" {
|
||||
return nil, nil, fmt.Errorf("jar_path or class must be specified")
|
||||
}
|
||||
|
||||
absPath, err := GetAbsolutePath("java")
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to find java binary: %s", err)
|
||||
}
|
||||
|
||||
args := javaCmdArgs(driverConfig)
|
||||
|
||||
d.logger.Info("starting java task", "driver_cfg", hclog.Fmt("%+v", driverConfig), "args", args)
|
||||
|
||||
handle := drivers.NewTaskHandle(pluginName)
|
||||
handle.Config = cfg
|
||||
|
||||
pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out")
|
||||
executorConfig := &dstructs.ExecutorConfig{
|
||||
LogFile: pluginLogFile,
|
||||
LogLevel: "debug",
|
||||
}
|
||||
|
||||
exec, pluginClient, err := utils.CreateExecutor(os.Stderr, hclog.Debug, d.nomadConfig, executorConfig)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to create executor: %v", err)
|
||||
}
|
||||
|
||||
execCmd := &executor.ExecCommand{
|
||||
Cmd: absPath,
|
||||
Args: args,
|
||||
Env: cfg.EnvList(),
|
||||
User: cfg.User,
|
||||
ResourceLimits: true,
|
||||
Resources: &executor.Resources{
|
||||
CPU: int(cfg.Resources.LinuxResources.CPUShares),
|
||||
MemoryMB: int(drivers.BytesToMB(cfg.Resources.LinuxResources.MemoryLimitBytes)),
|
||||
DiskMB: cfg.Resources.NomadResources.DiskMB,
|
||||
},
|
||||
TaskDir: cfg.TaskDir().Dir,
|
||||
StdoutPath: cfg.StdoutPath,
|
||||
StderrPath: cfg.StderrPath,
|
||||
}
|
||||
|
||||
ps, err := exec.Launch(execCmd)
|
||||
if err != nil {
|
||||
pluginClient.Kill()
|
||||
return nil, nil, fmt.Errorf("failed to launch command with executor: %v", err)
|
||||
}
|
||||
|
||||
h := &taskHandle{
|
||||
exec: exec,
|
||||
pid: ps.Pid,
|
||||
pluginClient: pluginClient,
|
||||
taskConfig: cfg,
|
||||
procState: drivers.TaskStateRunning,
|
||||
startedAt: time.Now().Round(time.Millisecond),
|
||||
logger: d.logger,
|
||||
}
|
||||
|
||||
driverState := TaskState{
|
||||
ReattachConfig: utils.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()),
|
||||
Pid: ps.Pid,
|
||||
TaskConfig: cfg,
|
||||
StartedAt: h.startedAt,
|
||||
}
|
||||
|
||||
if err := handle.SetDriverState(&driverState); err != nil {
|
||||
d.logger.Error("failed to start task, error setting driver state", "error", err)
|
||||
exec.Shutdown("", 0)
|
||||
pluginClient.Kill()
|
||||
return nil, nil, fmt.Errorf("failed to set driver state: %v", err)
|
||||
}
|
||||
|
||||
d.tasks.Set(cfg.ID, h)
|
||||
go h.run()
|
||||
return handle, nil, nil
|
||||
}
|
||||
|
||||
func javaCmdArgs(driverConfig TaskConfig) []string {
|
||||
args := []string{}
|
||||
// Look for jvm options
|
||||
if len(driverConfig.JvmOpts) != 0 {
|
||||
args = append(args, driverConfig.JvmOpts...)
|
||||
}
|
||||
|
||||
// Add the classpath
|
||||
if driverConfig.ClassPath != "" {
|
||||
args = append(args, "-cp", driverConfig.ClassPath)
|
||||
}
|
||||
|
||||
// Add the jar
|
||||
if driverConfig.JarPath != "" {
|
||||
args = append(args, "-jar", driverConfig.JarPath)
|
||||
}
|
||||
|
||||
// Add the class
|
||||
if driverConfig.Class != "" {
|
||||
args = append(args, driverConfig.Class)
|
||||
}
|
||||
|
||||
// Add any args
|
||||
if len(driverConfig.Args) != 0 {
|
||||
args = append(args, driverConfig.Args...)
|
||||
}
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) {
|
||||
handle, ok := d.tasks.Get(taskID)
|
||||
if !ok {
|
||||
return nil, drivers.ErrTaskNotFound
|
||||
}
|
||||
|
||||
ch := make(chan *drivers.ExitResult)
|
||||
go d.handleWait(ctx, handle, ch)
|
||||
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) {
|
||||
defer close(ch)
|
||||
var result *drivers.ExitResult
|
||||
ps, err := handle.exec.Wait()
|
||||
if err != nil {
|
||||
result = &drivers.ExitResult{
|
||||
Err: fmt.Errorf("executor: error waiting on process: %v", err),
|
||||
}
|
||||
} else {
|
||||
result = &drivers.ExitResult{
|
||||
ExitCode: ps.ExitCode,
|
||||
Signal: ps.Signal,
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-d.ctx.Done():
|
||||
return
|
||||
case ch <- result:
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error {
|
||||
handle, ok := d.tasks.Get(taskID)
|
||||
if !ok {
|
||||
return drivers.ErrTaskNotFound
|
||||
}
|
||||
|
||||
if err := handle.exec.Shutdown(signal, timeout); err != nil {
|
||||
if handle.pluginClient.Exited() {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("executor Shutdown failed: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Driver) DestroyTask(taskID string, force bool) error {
|
||||
handle, ok := d.tasks.Get(taskID)
|
||||
if !ok {
|
||||
return drivers.ErrTaskNotFound
|
||||
}
|
||||
|
||||
if handle.IsRunning() && !force {
|
||||
return fmt.Errorf("cannot destroy running task")
|
||||
}
|
||||
|
||||
if !handle.pluginClient.Exited() {
|
||||
if handle.IsRunning() {
|
||||
if err := handle.exec.Shutdown("", 0); err != nil {
|
||||
handle.logger.Error("destroying executor failed", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
handle.pluginClient.Kill()
|
||||
}
|
||||
|
||||
d.tasks.Delete(taskID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) {
|
||||
handle, ok := d.tasks.Get(taskID)
|
||||
if !ok {
|
||||
return nil, drivers.ErrTaskNotFound
|
||||
}
|
||||
|
||||
handle.stateLock.RLock()
|
||||
defer handle.stateLock.RUnlock()
|
||||
|
||||
status := &drivers.TaskStatus{
|
||||
ID: handle.taskConfig.ID,
|
||||
Name: handle.taskConfig.Name,
|
||||
State: handle.procState,
|
||||
StartedAt: handle.startedAt,
|
||||
CompletedAt: handle.completedAt,
|
||||
ExitResult: handle.exitResult,
|
||||
DriverAttributes: map[string]string{
|
||||
"pid": strconv.Itoa(handle.pid),
|
||||
},
|
||||
}
|
||||
|
||||
return status, nil
|
||||
}
|
||||
|
||||
func (d *Driver) TaskStats(taskID string) (*cstructs.TaskResourceUsage, error) {
|
||||
handle, ok := d.tasks.Get(taskID)
|
||||
if !ok {
|
||||
return nil, drivers.ErrTaskNotFound
|
||||
}
|
||||
|
||||
return handle.exec.Stats()
|
||||
}
|
||||
|
||||
func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
|
||||
return d.eventer.TaskEvents(ctx)
|
||||
}
|
||||
|
||||
func (d *Driver) SignalTask(taskID string, signal string) error {
|
||||
handle, ok := d.tasks.Get(taskID)
|
||||
if !ok {
|
||||
return drivers.ErrTaskNotFound
|
||||
}
|
||||
|
||||
sig := os.Interrupt
|
||||
if s, ok := signals.SignalLookup[signal]; ok {
|
||||
d.logger.Warn("signal to send to task unknown, using SIGINT", "signal", signal, "task_id", handle.taskConfig.ID)
|
||||
sig = s
|
||||
}
|
||||
return handle.exec.Signal(sig)
|
||||
}
|
||||
|
||||
func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) {
|
||||
if len(cmd) == 0 {
|
||||
return nil, fmt.Errorf("error cmd must have at least one value")
|
||||
}
|
||||
handle, ok := d.tasks.Get(taskID)
|
||||
if !ok {
|
||||
return nil, drivers.ErrTaskNotFound
|
||||
}
|
||||
|
||||
out, exitCode, err := handle.exec.Exec(time.Now().Add(timeout), cmd[0], cmd[1:])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &drivers.ExecTaskResult{
|
||||
Stdout: out,
|
||||
ExitResult: &drivers.ExitResult{
|
||||
ExitCode: exitCode,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetAbsolutePath returns the absolute path of the passed binary by resolving
|
||||
// it in the path and following symlinks.
|
||||
func GetAbsolutePath(bin string) (string, error) {
|
||||
lp, err := exec.LookPath(bin)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to resolve path to %q executable: %v", bin, err)
|
||||
}
|
||||
|
||||
return filepath.EvalSymlinks(lp)
|
||||
}
|
||||
Reference in New Issue
Block a user