Merge pull request #4961 from hashicorp/f-grpc-executor

GRPC Executor
This commit is contained in:
Nick Ethier
2018-12-15 00:34:36 -05:00
committed by GitHub
36 changed files with 1874 additions and 457 deletions

View File

@@ -8,8 +8,8 @@ import (
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/drivers/shared/executor"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/executor"
)
type ExecutorPluginCommand struct {
@@ -49,6 +49,7 @@ func (e *ExecutorPluginCommand) Run(args []string) int {
hclog.LevelFromString(executorConfig.LogLevel),
executorConfig.FSIsolation,
),
GRPCServer: plugin.DefaultGRPCServer,
})
return 0
}

View File

@@ -1849,9 +1849,9 @@ func TestDockerDriver_Mounts(t *testing.T) {
},
}
d := dockerDriverHarness(t, nil)
for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
d := dockerDriverHarness(t, nil)
// Build the task
task, cfg, _ := dockerTask(t)
cfg.Command = "/bin/sleep"

View File

@@ -18,7 +18,6 @@ import (
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/drivers/utils"
pexecutor "github.com/hashicorp/nomad/plugins/executor"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/plugins/shared/loader"
@@ -249,7 +248,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
Reattach: plugRC,
}
exec, pluginClient, err := utils.CreateExecutorWithConfig(pluginConfig, os.Stderr)
exec, pluginClient, err := executor.CreateExecutorWithConfig(pluginConfig, os.Stderr)
if err != nil {
d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to reattach to executor: %v", err)
@@ -286,14 +285,14 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru
handle.Config = cfg
pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out")
executorConfig := &pexecutor.ExecutorConfig{
executorConfig := &executor.ExecutorConfig{
LogFile: pluginLogFile,
LogLevel: "debug",
FSIsolation: true,
}
// TODO: best way to pass port ranges in from client config
exec, pluginClient, err := utils.CreateExecutor(os.Stderr, hclog.Debug, d.nomadConfig, executorConfig)
exec, pluginClient, err := executor.CreateExecutor(os.Stderr, hclog.Debug, d.nomadConfig, executorConfig)
if err != nil {
return nil, nil, fmt.Errorf("failed to create executor: %v", err)
}
@@ -304,7 +303,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru
Env: cfg.EnvList(),
User: cfg.User,
ResourceLimits: true,
Resources: toExecResources(cfg.Resources),
Resources: cfg.Resources,
TaskDir: cfg.TaskDir().Dir,
StdoutPath: cfg.StdoutPath,
StderrPath: cfg.StderrPath,
@@ -347,19 +346,6 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru
return handle, nil, nil
}
func toExecResources(resources *drivers.Resources) *executor.Resources {
if resources == nil || resources.NomadResources == nil {
return nil
}
return &executor.Resources{
CPU: resources.NomadResources.CPU,
MemoryMB: resources.NomadResources.MemoryMB,
DiskMB: resources.NomadResources.DiskMB,
}
}
func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
@@ -375,7 +361,7 @@ func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.E
func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) {
defer close(ch)
var result *drivers.ExitResult
ps, err := handle.exec.Wait()
ps, err := handle.exec.Wait(ctx)
if err != nil {
result = &drivers.ExitResult{
Err: fmt.Errorf("executor: error waiting on process: %v", err),

View File

@@ -108,7 +108,6 @@ func TestExecDriver_StartWait(t *testing.T) {
cleanup := harness.MkAllocDir(task, false)
defer cleanup()
fmt.Println(task.AllocDir)
handle, _, err := harness.StartTask(task)
require.NoError(err)

View File

@@ -1,6 +1,7 @@
package exec
import (
"context"
"strconv"
"sync"
"time"
@@ -58,7 +59,7 @@ func (h *taskHandle) run() {
h.stateLock.Unlock()
// Block until process exits
ps, err := h.exec.Wait()
ps, err := h.exec.Wait(context.Background())
h.stateLock.Lock()
defer h.stateLock.Unlock()

View File

@@ -19,8 +19,6 @@ import (
"github.com/hashicorp/nomad/drivers/shared/executor"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/drivers/utils"
pexecutor "github.com/hashicorp/nomad/plugins/executor"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/plugins/shared/loader"
@@ -271,7 +269,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
Reattach: plugRC,
}
execImpl, pluginClient, err := utils.CreateExecutorWithConfig(pluginConfig, os.Stderr)
execImpl, pluginClient, err := executor.CreateExecutorWithConfig(pluginConfig, os.Stderr)
if err != nil {
d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to reattach to executor: %v", err)
@@ -320,12 +318,12 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru
handle.Config = cfg
pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out")
executorConfig := &pexecutor.ExecutorConfig{
executorConfig := &executor.ExecutorConfig{
LogFile: pluginLogFile,
LogLevel: "debug",
}
exec, pluginClient, err := utils.CreateExecutor(os.Stderr, hclog.Debug, d.nomadConfig, executorConfig)
exec, pluginClient, err := executor.CreateExecutor(os.Stderr, hclog.Debug, d.nomadConfig, executorConfig)
if err != nil {
return nil, nil, fmt.Errorf("failed to create executor: %v", err)
}
@@ -336,16 +334,12 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru
Env: cfg.EnvList(),
User: cfg.User,
ResourceLimits: true,
Resources: &executor.Resources{
CPU: cfg.Resources.NomadResources.CPU,
MemoryMB: cfg.Resources.NomadResources.MemoryMB,
DiskMB: cfg.Resources.NomadResources.DiskMB,
},
TaskDir: cfg.TaskDir().Dir,
StdoutPath: cfg.StdoutPath,
StderrPath: cfg.StderrPath,
Mounts: cfg.Mounts,
Devices: cfg.Devices,
Resources: cfg.Resources,
TaskDir: cfg.TaskDir().Dir,
StdoutPath: cfg.StdoutPath,
StderrPath: cfg.StderrPath,
Mounts: cfg.Mounts,
Devices: cfg.Devices,
}
ps, err := exec.Launch(execCmd)
@@ -428,7 +422,7 @@ func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.E
func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) {
defer close(ch)
var result *drivers.ExitResult
ps, err := handle.exec.Wait()
ps, err := handle.exec.Wait(ctx)
if err != nil {
result = &drivers.ExitResult{
Err: fmt.Errorf("executor: error waiting on process: %v", err),

View File

@@ -1,6 +1,7 @@
package java
import (
"context"
"strconv"
"sync"
"time"
@@ -57,7 +58,7 @@ func (h *taskHandle) run() {
}
h.stateLock.Unlock()
ps, err := h.exec.Wait()
ps, err := h.exec.Wait(context.Background())
h.stateLock.Lock()
defer h.stateLock.Unlock()

View File

@@ -21,8 +21,6 @@ import (
"github.com/hashicorp/nomad/drivers/shared/executor"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/drivers/utils"
pexecutor "github.com/hashicorp/nomad/plugins/executor"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/plugins/shared/loader"
@@ -270,7 +268,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
Reattach: plugRC,
}
execImpl, pluginClient, err := utils.CreateExecutorWithConfig(pluginConfig, os.Stderr)
execImpl, pluginClient, err := executor.CreateExecutorWithConfig(pluginConfig, os.Stderr)
if err != nil {
d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to reattach to executor: %v", err)
@@ -414,12 +412,12 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru
d.logger.Debug("starting QemuVM command ", "args", strings.Join(args, " "))
pluginLogFile := filepath.Join(cfg.TaskDir().Dir, fmt.Sprintf("%s-executor.out", cfg.Name))
executorConfig := &pexecutor.ExecutorConfig{
executorConfig := &executor.ExecutorConfig{
LogFile: pluginLogFile,
LogLevel: "debug",
}
execImpl, pluginClient, err := utils.CreateExecutor(os.Stderr, hclog.Debug, d.nomadConfig, executorConfig)
execImpl, pluginClient, err := executor.CreateExecutor(os.Stderr, hclog.Debug, d.nomadConfig, executorConfig)
if err != nil {
return nil, nil, err
}
@@ -584,7 +582,7 @@ func GetAbsolutePath(bin string) (string, error) {
func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) {
defer close(ch)
var result *drivers.ExitResult
ps, err := handle.exec.Wait()
ps, err := handle.exec.Wait(ctx)
if err != nil {
result = &drivers.ExitResult{
Err: fmt.Errorf("executor: error waiting on process: %v", err),

View File

@@ -1,6 +1,7 @@
package qemu
import (
"context"
"strconv"
"sync"
"time"
@@ -58,7 +59,7 @@ func (h *taskHandle) run() {
}
h.stateLock.Unlock()
ps, err := h.exec.Wait()
ps, err := h.exec.Wait(context.Background())
h.stateLock.Lock()
defer h.stateLock.Unlock()

View File

@@ -18,8 +18,6 @@ import (
"github.com/hashicorp/nomad/drivers/shared/executor"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/drivers/utils"
pexecutor "github.com/hashicorp/nomad/plugins/executor"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/plugins/shared/loader"
@@ -276,7 +274,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
}
// Create client for reattached executor
exec, pluginClient, err := utils.CreateExecutorWithConfig(pluginConfig, os.Stderr)
exec, pluginClient, err := executor.CreateExecutorWithConfig(pluginConfig, os.Stderr)
if err != nil {
d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to reattach to executor: %v", err)
@@ -313,12 +311,12 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru
handle.Config = cfg
pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out")
executorConfig := &pexecutor.ExecutorConfig{
executorConfig := &executor.ExecutorConfig{
LogFile: pluginLogFile,
LogLevel: "debug",
}
exec, pluginClient, err := utils.CreateExecutor(os.Stderr, hclog.Debug, d.nomadConfig, executorConfig)
exec, pluginClient, err := executor.CreateExecutor(os.Stderr, hclog.Debug, d.nomadConfig, executorConfig)
if err != nil {
return nil, nil, fmt.Errorf("failed to create executor: %v", err)
}
@@ -388,7 +386,7 @@ func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.E
func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) {
defer close(ch)
var result *drivers.ExitResult
ps, err := handle.exec.Wait()
ps, err := handle.exec.Wait(ctx)
if err != nil {
result = &drivers.ExitResult{
Err: fmt.Errorf("executor: error waiting on process: %v", err),
@@ -479,8 +477,9 @@ func (d *Driver) SignalTask(taskID string, signal string) error {
sig := os.Interrupt
if s, ok := signals.SignalLookup[signal]; ok {
d.logger.Warn("signal to send to task unknown, using SIGINT", "signal", signal, "task_id", handle.taskConfig.ID)
sig = s
} else {
d.logger.Warn("signal to send to task unknown, using SIGINT", "signal", signal, "task_id", handle.taskConfig.ID)
}
return handle.exec.Signal(sig)
}

View File

@@ -1,6 +1,7 @@
package rawexec
import (
"context"
"strconv"
"sync"
"time"
@@ -58,7 +59,7 @@ func (h *taskHandle) run() {
h.stateLock.Unlock()
// Block until process exits
ps, err := h.exec.Wait()
ps, err := h.exec.Wait(context.Background())
h.stateLock.Lock()
defer h.stateLock.Unlock()

View File

@@ -31,8 +31,6 @@ import (
"github.com/hashicorp/nomad/drivers/shared/executor"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/drivers/utils"
pexecutor "github.com/hashicorp/nomad/plugins/executor"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/plugins/shared/loader"
@@ -344,7 +342,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
Reattach: plugRC,
}
execImpl, pluginClient, err := utils.CreateExecutorWithConfig(pluginConfig, os.Stderr)
execImpl, pluginClient, err := executor.CreateExecutorWithConfig(pluginConfig, os.Stderr)
if err != nil {
d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to reattach to executor: %v", err)
@@ -619,12 +617,12 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru
}
pluginLogFile := filepath.Join(cfg.TaskDir().Dir, fmt.Sprintf("%s-executor.out", cfg.Name))
executorConfig := &pexecutor.ExecutorConfig{
executorConfig := &executor.ExecutorConfig{
LogFile: pluginLogFile,
LogLevel: "debug",
}
execImpl, pluginClient, err := utils.CreateExecutor(os.Stderr, hclog.Debug, d.nomadConfig, executorConfig)
execImpl, pluginClient, err := executor.CreateExecutor(os.Stderr, hclog.Debug, d.nomadConfig, executorConfig)
if err != nil {
return nil, nil, err
}
@@ -662,15 +660,11 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru
Cmd: absPath,
Args: runArgs,
ResourceLimits: true,
Resources: &executor.Resources{
CPU: int(cfg.Resources.LinuxResources.CPUShares),
MemoryMB: int(drivers.BytesToMB(cfg.Resources.LinuxResources.MemoryLimitBytes)),
DiskMB: cfg.Resources.NomadResources.DiskMB,
},
Env: rktEnv.List(),
TaskDir: cfg.TaskDir().Dir,
StdoutPath: cfg.StdoutPath,
StderrPath: cfg.StderrPath,
Resources: cfg.Resources,
Env: cfg.EnvList(),
TaskDir: cfg.TaskDir().Dir,
StdoutPath: cfg.StdoutPath,
StderrPath: cfg.StderrPath,
}
ps, err := execImpl.Launch(execCmd)
if err != nil {
@@ -1010,7 +1004,7 @@ func elide(inBuf bytes.Buffer) string {
func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) {
defer close(ch)
var result *drivers.ExitResult
ps, err := handle.exec.Wait()
ps, err := handle.exec.Wait(ctx)
if err != nil {
result = &drivers.ExitResult{
Err: fmt.Errorf("executor: error waiting on process: %v", err),

View File

@@ -3,6 +3,7 @@
package rkt
import (
"context"
"strconv"
"sync"
"time"
@@ -62,7 +63,7 @@ func (h *taskHandle) run() {
}
h.stateLock.Unlock()
ps, err := h.exec.Wait()
ps, err := h.exec.Wait(context.Background())
h.stateLock.Lock()
defer h.stateLock.Unlock()

View File

@@ -0,0 +1,152 @@
package executor
import (
"context"
"fmt"
"os"
"syscall"
"time"
"github.com/LK4D4/joincontext"
"github.com/golang/protobuf/ptypes"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/shared/executor/proto"
"github.com/hashicorp/nomad/plugins/drivers"
)
var _ Executor = (*grpcExecutorClient)(nil)
type grpcExecutorClient struct {
client proto.ExecutorClient
// doneCtx is close when the plugin exits
doneCtx context.Context
}
func (c *grpcExecutorClient) Launch(cmd *ExecCommand) (*ProcessState, error) {
ctx := context.Background()
req := &proto.LaunchRequest{
Cmd: cmd.Cmd,
Args: cmd.Args,
Resources: drivers.ResourcesToProto(cmd.Resources),
StdoutPath: cmd.StdoutPath,
StderrPath: cmd.StderrPath,
Env: cmd.Env,
User: cmd.User,
TaskDir: cmd.TaskDir,
ResourceLimits: cmd.ResourceLimits,
BasicProcessCgroup: cmd.BasicProcessCgroup,
Mounts: drivers.MountsToProto(cmd.Mounts),
Devices: drivers.DevicesToProto(cmd.Devices),
}
resp, err := c.client.Launch(ctx, req)
if err != nil {
return nil, err
}
ps, err := processStateFromProto(resp.Process)
if err != nil {
return nil, err
}
return ps, nil
}
func (c *grpcExecutorClient) Wait(ctx context.Context) (*ProcessState, error) {
// Join the passed context and the shutdown context
ctx, _ = joincontext.Join(ctx, c.doneCtx)
resp, err := c.client.Wait(ctx, &proto.WaitRequest{})
if err != nil {
return nil, err
}
ps, err := processStateFromProto(resp.Process)
if err != nil {
return nil, err
}
return ps, nil
}
func (c *grpcExecutorClient) Shutdown(signal string, gracePeriod time.Duration) error {
ctx := context.Background()
req := &proto.ShutdownRequest{
Signal: signal,
GracePeriod: gracePeriod.Nanoseconds(),
}
if _, err := c.client.Shutdown(ctx, req); err != nil {
return err
}
return nil
}
func (c *grpcExecutorClient) UpdateResources(r *drivers.Resources) error {
ctx := context.Background()
req := &proto.UpdateResourcesRequest{Resources: drivers.ResourcesToProto(r)}
if _, err := c.client.UpdateResources(ctx, req); err != nil {
return err
}
return nil
}
func (c *grpcExecutorClient) Version() (*ExecutorVersion, error) {
ctx := context.Background()
resp, err := c.client.Version(ctx, &proto.VersionRequest{})
if err != nil {
return nil, err
}
return &ExecutorVersion{Version: resp.Version}, nil
}
func (c *grpcExecutorClient) Stats() (*cstructs.TaskResourceUsage, error) {
ctx := context.Background()
resp, err := c.client.Stats(ctx, &proto.StatsRequest{})
if err != nil {
return nil, err
}
stats, err := drivers.TaskStatsFromProto(resp.Stats)
if err != nil {
return nil, err
}
return stats, nil
}
func (c *grpcExecutorClient) Signal(s os.Signal) error {
ctx := context.Background()
sig, ok := s.(syscall.Signal)
if !ok {
return fmt.Errorf("unsupported signal type: %q", s.String())
}
req := &proto.SignalRequest{
Signal: int32(sig),
}
if _, err := c.client.Signal(ctx, req); err != nil {
return err
}
return nil
}
func (c *grpcExecutorClient) Exec(deadline time.Time, cmd string, args []string) ([]byte, int, error) {
ctx := context.Background()
pbDeadline, err := ptypes.TimestampProto(deadline)
if err != nil {
return nil, 0, err
}
req := &proto.ExecRequest{
Deadline: pbDeadline,
Cmd: cmd,
Args: args,
}
resp, err := c.client.Exec(ctx, req)
if err != nil {
return nil, 0, err
}
return resp.Output, int(resp.ExitCode), nil
}

View File

@@ -20,9 +20,10 @@ import (
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/lib/fifo"
"github.com/hashicorp/nomad/client/stats"
"github.com/hashicorp/nomad/plugins/drivers"
cstructs "github.com/hashicorp/nomad/client/structs"
shelpers "github.com/hashicorp/nomad/helper/stats"
"github.com/hashicorp/nomad/plugins/drivers"
)
const (
@@ -47,7 +48,7 @@ type Executor interface {
Launch(launchCmd *ExecCommand) (*ProcessState, error)
// Wait blocks until the process exits or an error occures
Wait() (*ProcessState, error)
Wait(ctx context.Context) (*ProcessState, error)
// Shutdown will shutdown the executor by stopping the user process,
// cleaning up and resources created by the executor. The shutdown sequence
@@ -61,7 +62,7 @@ type Executor interface {
// UpdateResources updates any resource isolation enforcement with new
// constraints if supported.
UpdateResources(*Resources) error
UpdateResources(*drivers.Resources) error
// Version returns the executor API version
Version() (*ExecutorVersion, error)
@@ -77,13 +78,6 @@ type Executor interface {
Exec(deadline time.Time, cmd string, args []string) ([]byte, int, error)
}
// Resources describes the resource isolation required
type Resources struct {
CPU int
MemoryMB int
DiskMB int
}
// ExecCommand holds the user command, args, and other isolation related
// settings.
type ExecCommand struct {
@@ -94,13 +88,13 @@ type ExecCommand struct {
Args []string
// Resources defined by the task
Resources *Resources
Resources *drivers.Resources
// StdoutPath is the path the procoess stdout should be written to
// StdoutPath is the path the process stdout should be written to
StdoutPath string
stdout io.WriteCloser
// StderrPath is the path the procoess stderr should be written to
// StderrPath is the path the process stderr should be written to
StderrPath string
stderr io.WriteCloser
@@ -129,6 +123,20 @@ type ExecCommand struct {
Devices []*drivers.DeviceConfig
}
// SetWriters sets the writer for the process stdout and stderr. This should
// not be used if writing to a file path such as a fifo file. SetStdoutWriter
// is mainly used for unit testing purposes.
func (c *ExecCommand) SetWriters(out io.WriteCloser, err io.WriteCloser) {
c.stdout = out
c.stderr = err
}
// GetWriters returns the unexported io.WriteCloser for the stdout and stderr
// handles. This is mainly used for unit testing purposes.
func (c *ExecCommand) GetWriters() (stdout io.WriteCloser, stderr io.WriteCloser) {
return c.stdout, c.stderr
}
type nopCloser struct {
io.Writer
}
@@ -350,12 +358,16 @@ func ExecScript(ctx context.Context, dir string, env []string, attrs *syscall.Sy
}
// Wait waits until a process has exited and returns it's exitcode and errors
func (e *UniversalExecutor) Wait() (*ProcessState, error) {
<-e.processExited
return e.exitState, nil
func (e *UniversalExecutor) Wait(ctx context.Context) (*ProcessState, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-e.processExited:
return e.exitState, nil
}
}
func (e *UniversalExecutor) UpdateResources(resources *Resources) error {
func (e *UniversalExecutor) UpdateResources(resources *drivers.Resources) error {
return nil
}

View File

@@ -2,7 +2,9 @@
package executor
import hclog "github.com/hashicorp/go-hclog"
import (
hclog "github.com/hashicorp/go-hclog"
)
func NewExecutorWithIsolation(logger hclog.Logger) Executor {
logger = logger.Named("executor")

View File

@@ -22,6 +22,7 @@ import (
"github.com/hashicorp/nomad/helper/discover"
shelpers "github.com/hashicorp/nomad/helper/stats"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/opencontainers/runc/libcontainer"
"github.com/opencontainers/runc/libcontainer/cgroups"
@@ -103,7 +104,9 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro
}
if command.Resources == nil {
command.Resources = &Resources{}
command.Resources = &drivers.Resources{
NomadResources: &structs.Resources{},
}
}
l.command = command
@@ -132,6 +135,7 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro
if err != nil {
return nil, fmt.Errorf("failed to configure container(%s): %v", l.id, err)
}
container, err := factory.Create(l.id, containerCfg)
if err != nil {
return nil, fmt.Errorf("failed to create container(%s): %v", l.id, err)
@@ -237,9 +241,13 @@ func (l *LibcontainerExecutor) getAllPids() (map[int]*nomadPid, error) {
}
// Wait waits until a process has exited and returns it's exitcode and errors
func (l *LibcontainerExecutor) Wait() (*ProcessState, error) {
<-l.userProcExited
return l.exitState, nil
func (l *LibcontainerExecutor) Wait(ctx context.Context) (*ProcessState, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-l.userProcExited:
return l.exitState, nil
}
}
func (l *LibcontainerExecutor) wait() {
@@ -337,7 +345,7 @@ func (l *LibcontainerExecutor) Shutdown(signal string, grace time.Duration) erro
}
// UpdateResources updates the resource isolation with new values to be enforced
func (l *LibcontainerExecutor) UpdateResources(resources *Resources) error {
func (l *LibcontainerExecutor) UpdateResources(resources *drivers.Resources) error {
return nil
}
@@ -463,7 +471,7 @@ func (l *LibcontainerExecutor) handleExecWait(ch chan *waitResult, process *libc
ch <- &waitResult{ps, err}
}
func configureCapabilities(cfg *lconfigs.Config, command *ExecCommand) {
func configureCapabilities(cfg *lconfigs.Config, command *ExecCommand) error {
// TODO: allow better control of these
cfg.Capabilities = &lconfigs.Capabilities{
Bounding: allCaps,
@@ -473,6 +481,7 @@ func configureCapabilities(cfg *lconfigs.Config, command *ExecCommand) {
Effective: allCaps,
}
return nil
}
func configureIsolation(cfg *lconfigs.Config, command *ExecCommand) error {
@@ -564,20 +573,25 @@ func configureCgroups(cfg *lconfigs.Config, command *ExecCommand) error {
id := uuid.Generate()
cfg.Cgroups.Path = filepath.Join(defaultCgroupParent, id)
if command.Resources.MemoryMB > 0 {
if command.Resources == nil || command.Resources.NomadResources == nil {
return nil
}
if command.Resources.NomadResources.MemoryMB > 0 {
// Total amount of memory allowed to consume
cfg.Cgroups.Resources.Memory = int64(command.Resources.MemoryMB * 1024 * 1024)
cfg.Cgroups.Resources.Memory = int64(command.Resources.NomadResources.MemoryMB * 1024 * 1024)
// Disable swap to avoid issues on the machine
var memSwappiness uint64 = 0
var memSwappiness uint64
cfg.Cgroups.Resources.MemorySwappiness = &memSwappiness
}
if command.Resources.CPU < 2 {
return fmt.Errorf("resources.CPU must be equal to or greater than 2: %v", command.Resources.CPU)
if command.Resources.NomadResources.CPU < 2 {
return fmt.Errorf("resources.CPU must be equal to or greater than 2: %v", command.Resources.NomadResources.CPU)
}
// Set the relative CPU shares for this cgroup.
cfg.Cgroups.Resources.CpuShares = uint64(command.Resources.CPU)
cfg.Cgroups.Resources.CpuShares = uint64(command.Resources.NomadResources.CPU)
return nil
}
@@ -625,7 +639,9 @@ func newLibcontainerConfig(command *ExecCommand) (*lconfigs.Config, error) {
Version: "1.0.0",
}
configureCapabilities(cfg, command)
if err := configureCapabilities(cfg, command); err != nil {
return nil, err
}
if err := configureIsolation(cfg, command); err != nil {
return nil, err
}

View File

@@ -1,6 +1,7 @@
package executor
import (
"context"
"fmt"
"io/ioutil"
"os"
@@ -67,10 +68,8 @@ func testExecutorCommandWithChroot(t *testing.T) (*ExecCommand, *allocdir.AllocD
cmd := &ExecCommand{
Env: taskEnv.List(),
TaskDir: td.Dir,
Resources: &Resources{
CPU: task.Resources.CPU,
MemoryMB: task.Resources.MemoryMB,
DiskMB: task.Resources.DiskMB,
Resources: &drivers.Resources{
NomadResources: task.Resources,
},
}
configureTLogging(cmd)
@@ -97,7 +96,7 @@ func TestExecutor_IsolationAndConstraints(t *testing.T) {
require.NoError(err)
require.NotZero(ps.Pid)
state, err := executor.Wait()
state, err := executor.Wait(context.Background())
require.NoError(err)
require.Zero(state.ExitCode)
@@ -110,11 +109,11 @@ func TestExecutor_IsolationAndConstraints(t *testing.T) {
data, err := ioutil.ReadFile(memLimits)
require.NoError(err)
expectedMemLim := strconv.Itoa(execCmd.Resources.MemoryMB * 1024 * 1024)
expectedMemLim := strconv.Itoa(execCmd.Resources.NomadResources.MemoryMB * 1024 * 1024)
actualMemLim := strings.TrimSpace(string(data))
require.Equal(actualMemLim, expectedMemLim)
require.NoError(executor.Shutdown("", 0))
executor.Wait()
executor.Wait(context.Background())
// Check if Nomad has actually removed the cgroups
tu.WaitForResult(func() (bool, error) {
@@ -145,7 +144,8 @@ ld.so.cache
ld.so.conf
ld.so.conf.d/`
tu.WaitForResult(func() (bool, error) {
output := execCmd.stdout.(*bufferCloser).String()
outWriter, _ := execCmd.GetWriters()
output := outWriter.(*bufferCloser).String()
act := strings.TrimSpace(string(output))
if act != expected {
return false, fmt.Errorf("Command output incorrectly: want %v; got %v", expected, act)
@@ -180,7 +180,7 @@ func TestExecutor_ClientCleanup(t *testing.T) {
ch := make(chan interface{})
go func() {
executor.Wait()
executor.Wait(context.Background())
close(ch)
}()
@@ -191,10 +191,11 @@ func TestExecutor_ClientCleanup(t *testing.T) {
require.Fail("timeout waiting for exec to shutdown")
}
output := execCmd.stdout.(*bufferCloser).String()
outWriter, _ := execCmd.GetWriters()
output := outWriter.(*bufferCloser).String()
require.NotZero(len(output))
time.Sleep(2 * time.Second)
output1 := execCmd.stdout.(*bufferCloser).String()
output1 := outWriter.(*bufferCloser).String()
require.Equal(len(output), len(output1))
}

View File

@@ -0,0 +1,33 @@
package executor
import (
"context"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/drivers/shared/executor/proto"
"google.golang.org/grpc"
)
type ExecutorPlugin struct {
// TODO: support backwards compatibility with pre 0.9 NetRPC plugin
plugin.NetRPCUnsupportedPlugin
logger hclog.Logger
fsIsolation bool
}
func (p *ExecutorPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error {
if p.fsIsolation {
proto.RegisterExecutorServer(s, &grpcExecutorServer{impl: NewExecutorWithIsolation(p.logger)})
} else {
proto.RegisterExecutorServer(s, &grpcExecutorServer{impl: NewExecutor(p.logger)})
}
return nil
}
func (p *ExecutorPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
return &grpcExecutorClient{
client: proto.NewExecutorClient(c),
doneCtx: ctx,
}, nil
}

View File

@@ -2,6 +2,7 @@ package executor
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"os"
@@ -12,13 +13,15 @@ import (
"testing"
"time"
"github.com/hashicorp/nomad/plugins/drivers"
tu "github.com/hashicorp/nomad/testutil"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocdir"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
tu "github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
@@ -51,10 +54,8 @@ func testExecutorCommand(t *testing.T) (*ExecCommand, *allocdir.AllocDir) {
cmd := &ExecCommand{
Env: taskEnv.List(),
TaskDir: td.Dir,
Resources: &Resources{
CPU: task.Resources.CPU,
MemoryMB: task.Resources.MemoryMB,
DiskMB: task.Resources.DiskMB,
Resources: &drivers.Resources{
NomadResources: task.Resources,
},
}
@@ -70,8 +71,7 @@ type bufferCloser struct {
func (_ *bufferCloser) Close() error { return nil }
func configureTLogging(cmd *ExecCommand) (stdout bufferCloser, stderr bufferCloser) {
cmd.stdout = &stdout
cmd.stderr = &stderr
cmd.SetWriters(&stdout, &stderr)
return
}
@@ -109,7 +109,7 @@ func TestExecutor_Start_Wait_Failure_Code(pt *testing.T) {
ps, err := executor.Launch(execCmd)
require.NoError(err)
require.NotZero(ps.Pid)
ps, _ = executor.Wait()
ps, _ = executor.Wait(context.Background())
require.NotZero(ps.ExitCode, "expected exit code to be non zero")
require.NoError(executor.Shutdown("SIGINT", 100*time.Millisecond))
})
@@ -133,13 +133,14 @@ func TestExecutor_Start_Wait(pt *testing.T) {
require.NoError(err)
require.NotZero(ps.Pid)
ps, err = executor.Wait()
ps, err = executor.Wait(context.Background())
require.NoError(err)
require.NoError(executor.Shutdown("SIGINT", 100*time.Millisecond))
expected := "hello world"
tu.WaitForResult(func() (bool, error) {
output := execCmd.stdout.(*bufferCloser).String()
outWriter, _ := execCmd.GetWriters()
output := outWriter.(*bufferCloser).String()
act := strings.TrimSpace(string(output))
if expected != act {
return false, fmt.Errorf("expected: '%s' actual: '%s'", expected, act)
@@ -178,7 +179,7 @@ func TestExecutor_WaitExitSignal(pt *testing.T) {
require.NoError(err)
}()
ps, err = executor.Wait()
ps, err = executor.Wait(context.Background())
require.NoError(err)
require.Equal(ps.Signal, int(syscall.SIGKILL))
})
@@ -204,7 +205,8 @@ func TestExecutor_Start_Kill(pt *testing.T) {
require.NoError(executor.Shutdown("SIGINT", 100*time.Millisecond))
time.Sleep(time.Duration(tu.TestMultiplier()*2) * time.Second)
output := execCmd.stdout.(*bufferCloser).String()
outWriter, _ := execCmd.GetWriters()
output := outWriter.(*bufferCloser).String()
expected := ""
act := strings.TrimSpace(string(output))
if act != expected {

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,91 @@
syntax = "proto3";
package hashicorp.nomad.plugins.executor.proto;
option go_package = "proto";
import "google/protobuf/timestamp.proto";
import "github.com/hashicorp/nomad/plugins/drivers/proto/driver.proto";
service Executor {
rpc Launch(LaunchRequest) returns (LaunchResponse) {}
rpc Wait(WaitRequest) returns (WaitResponse) {}
rpc Shutdown(ShutdownRequest) returns (ShutdownResponse) {}
rpc UpdateResources(UpdateResourcesRequest) returns (UpdateResourcesResponse) {}
rpc Version(VersionRequest) returns (VersionResponse) {}
rpc Stats(StatsRequest) returns (StatsResponse) {}
rpc Signal(SignalRequest) returns (SignalResponse) {}
rpc Exec(ExecRequest) returns (ExecResponse) {}
}
message LaunchRequest {
string cmd = 1;
repeated string args = 2;
hashicorp.nomad.plugins.drivers.proto.Resources resources = 3;
string stdout_path = 4;
string stderr_path = 5;
repeated string env = 6;
string user = 7;
string task_dir = 8;
bool resource_limits = 9;
bool basic_process_cgroup = 10;
repeated hashicorp.nomad.plugins.drivers.proto.Mount mounts = 11;
repeated hashicorp.nomad.plugins.drivers.proto.Device devices = 12;
}
message LaunchResponse {
ProcessState process = 1;
}
message WaitRequest {}
message WaitResponse{
ProcessState process = 1;
}
message ShutdownRequest {
string signal = 1;
int64 grace_period = 2;
}
message ShutdownResponse {}
message UpdateResourcesRequest{
hashicorp.nomad.plugins.drivers.proto.Resources resources = 1;
}
message UpdateResourcesResponse {}
message VersionRequest {}
message VersionResponse{
string version = 1;
}
message StatsRequest {}
message StatsResponse {
hashicorp.nomad.plugins.drivers.proto.TaskStats stats = 1;
}
message SignalRequest {
int32 signal = 1;
}
message SignalResponse {}
message ExecRequest {
google.protobuf.Timestamp deadline = 1;
string cmd = 2;
repeated string args = 3;
}
message ExecResponse {
bytes output = 1;
int32 exit_code = 2;
}
message ProcessState {
int32 pid = 1;
int32 exit_code = 2;
int32 signal = 3;
google.protobuf.Timestamp time = 4;
}

View File

@@ -0,0 +1,129 @@
package executor
import (
"syscall"
"time"
"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/nomad/drivers/shared/executor/proto"
"github.com/hashicorp/nomad/plugins/drivers"
"golang.org/x/net/context"
)
type grpcExecutorServer struct {
impl Executor
}
func (s *grpcExecutorServer) Launch(ctx context.Context, req *proto.LaunchRequest) (*proto.LaunchResponse, error) {
ps, err := s.impl.Launch(&ExecCommand{
Cmd: req.Cmd,
Args: req.Args,
Resources: drivers.ResourcesFromProto(req.Resources),
StdoutPath: req.StdoutPath,
StderrPath: req.StderrPath,
Env: req.Env,
User: req.User,
TaskDir: req.TaskDir,
ResourceLimits: req.ResourceLimits,
BasicProcessCgroup: req.BasicProcessCgroup,
Mounts: drivers.MountsFromProto(req.Mounts),
Devices: drivers.DevicesFromProto(req.Devices),
})
if err != nil {
return nil, err
}
process, err := processStateToProto(ps)
if err != nil {
return nil, err
}
return &proto.LaunchResponse{
Process: process,
}, nil
}
func (s *grpcExecutorServer) Wait(ctx context.Context, req *proto.WaitRequest) (*proto.WaitResponse, error) {
ps, err := s.impl.Wait(ctx)
if err != nil {
return nil, err
}
process, err := processStateToProto(ps)
if err != nil {
return nil, err
}
return &proto.WaitResponse{
Process: process,
}, nil
}
func (s *grpcExecutorServer) Shutdown(ctx context.Context, req *proto.ShutdownRequest) (*proto.ShutdownResponse, error) {
if err := s.impl.Shutdown(req.Signal, time.Duration(req.GracePeriod)); err != nil {
return nil, err
}
return &proto.ShutdownResponse{}, nil
}
func (s *grpcExecutorServer) UpdateResources(ctx context.Context, req *proto.UpdateResourcesRequest) (*proto.UpdateResourcesResponse, error) {
if err := s.impl.UpdateResources(drivers.ResourcesFromProto(req.Resources)); err != nil {
return nil, err
}
return &proto.UpdateResourcesResponse{}, nil
}
func (s *grpcExecutorServer) Version(context.Context, *proto.VersionRequest) (*proto.VersionResponse, error) {
v, err := s.impl.Version()
if err != nil {
return nil, err
}
return &proto.VersionResponse{
Version: v.Version,
}, nil
}
func (s *grpcExecutorServer) Stats(context.Context, *proto.StatsRequest) (*proto.StatsResponse, error) {
stats, err := s.impl.Stats()
if err != nil {
return nil, err
}
pbStats, err := drivers.TaskStatsToProto(stats)
if err != nil {
return nil, err
}
return &proto.StatsResponse{
Stats: pbStats,
}, nil
}
func (s *grpcExecutorServer) Signal(ctx context.Context, req *proto.SignalRequest) (*proto.SignalResponse, error) {
sig := syscall.Signal(req.Signal)
if err := s.impl.Signal(sig); err != nil {
return nil, err
}
return &proto.SignalResponse{}, nil
}
func (s *grpcExecutorServer) Exec(ctx context.Context, req *proto.ExecRequest) (*proto.ExecResponse, error) {
deadline, err := ptypes.Timestamp(req.Deadline)
if err != nil {
return nil, err
}
out, exit, err := s.impl.Exec(deadline, req.Cmd, req.Args)
if err != nil {
return nil, err
}
return &proto.ExecResponse{
Output: out,
ExitCode: int32(exit),
}, nil
}

View File

@@ -0,0 +1,131 @@
package executor
import (
"encoding/json"
"fmt"
"io"
"os/exec"
"github.com/golang/protobuf/ptypes"
hclog "github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/drivers/shared/executor/proto"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/plugins/base"
)
const (
// ExecutorDefaultMaxPort is the default max port used by the executor for
// searching for an available port
ExecutorDefaultMaxPort = 14512
// ExecutorDefaultMinPort is the default min port used by the executor for
// searching for an available port
ExecutorDefaultMinPort = 14000
)
// CreateExecutor launches an executor plugin and returns an instance of the
// Executor interface
func CreateExecutor(w io.Writer, level hclog.Level, driverConfig *base.ClientDriverConfig,
executorConfig *ExecutorConfig) (Executor, *plugin.Client, error) {
c, err := json.Marshal(executorConfig)
if err != nil {
return nil, nil, fmt.Errorf("unable to create executor config: %v", err)
}
bin, err := discover.NomadExecutable()
if err != nil {
return nil, nil, fmt.Errorf("unable to find the nomad binary: %v", err)
}
config := &plugin.ClientConfig{
Cmd: exec.Command(bin, "executor", string(c)),
}
config.HandshakeConfig = base.Handshake
config.Plugins = GetPluginMap(w, level, executorConfig.FSIsolation)
config.AllowedProtocols = []plugin.Protocol{plugin.ProtocolGRPC}
if driverConfig != nil {
config.MaxPort = driverConfig.ClientMaxPort
config.MinPort = driverConfig.ClientMinPort
} else {
config.MaxPort = ExecutorDefaultMaxPort
config.MinPort = ExecutorDefaultMinPort
}
// setting the setsid of the plugin process so that it doesn't get signals sent to
// the nomad client.
if config.Cmd != nil {
isolateCommand(config.Cmd)
}
executorClient := plugin.NewClient(config)
rpcClient, err := executorClient.Client()
if err != nil {
return nil, nil, fmt.Errorf("error creating rpc client for executor plugin: %v", err)
}
raw, err := rpcClient.Dispense("executor")
if err != nil {
return nil, nil, fmt.Errorf("unable to dispense the executor plugin: %v", err)
}
executorPlugin := raw.(Executor)
return executorPlugin, executorClient, nil
}
// CreateExecutorWithConfig launches a plugin with a given plugin config
func CreateExecutorWithConfig(config *plugin.ClientConfig, w io.Writer) (Executor, *plugin.Client, error) {
config.HandshakeConfig = base.Handshake
// Setting this to DEBUG since the log level at the executor server process
// is already set, and this effects only the executor client.
// TODO: Use versioned plugin map to support backwards compatibility with
// existing pre-0.9 executors
config.Plugins = GetPluginMap(w, hclog.Debug, false)
config.AllowedProtocols = []plugin.Protocol{plugin.ProtocolGRPC}
executorClient := plugin.NewClient(config)
rpcClient, err := executorClient.Client()
if err != nil {
return nil, nil, fmt.Errorf("error creating rpc client for executor plugin: %v", err)
}
raw, err := rpcClient.Dispense("executor")
if err != nil {
return nil, nil, fmt.Errorf("unable to dispense the executor plugin: %v", err)
}
executorPlugin, ok := raw.(Executor)
if !ok {
return nil, nil, fmt.Errorf("unexpected executor rpc type: %T", raw)
}
return executorPlugin, executorClient, nil
}
func processStateToProto(ps *ProcessState) (*proto.ProcessState, error) {
timestamp, err := ptypes.TimestampProto(ps.Time)
if err != nil {
return nil, err
}
pb := &proto.ProcessState{
Pid: int32(ps.Pid),
ExitCode: int32(ps.ExitCode),
Signal: int32(ps.Signal),
Time: timestamp,
}
return pb, nil
}
func processStateFromProto(pb *proto.ProcessState) (*ProcessState, error) {
timestamp, err := ptypes.Timestamp(pb.Time)
if err != nil {
return nil, err
}
return &ProcessState{
Pid: int(pb.Pid),
ExitCode: int(pb.ExitCode),
Signal: int(pb.Signal),
Time: timestamp,
}, nil
}

View File

@@ -0,0 +1,18 @@
// +build darwin dragonfly freebsd linux netbsd openbsd solaris
package executor
import (
"os/exec"
"syscall"
)
// isolateCommand sets the setsid flag in exec.Cmd to true so that the process
// becomes the process leader in a new session and doesn't receive signals that
// are sent to the parent process.
func isolateCommand(cmd *exec.Cmd) {
if cmd.SysProcAttr == nil {
cmd.SysProcAttr = &syscall.SysProcAttr{}
}
cmd.SysProcAttr.Setsid = true
}

View File

@@ -0,0 +1,6 @@
package executor
import "os/exec"
// TODO Figure out if this is needed in Windows
func isolateCommand(cmd *exec.Cmd) {}

View File

@@ -265,7 +265,7 @@ func (d *driverPluginClient) TaskStats(taskID string) (*cstructs.TaskResourceUsa
return nil, err
}
stats, err := taskStatsFromProto(resp.Stats)
stats, err := TaskStatsFromProto(resp.Stats)
if err != nil {
return nil, err
}

View File

@@ -230,7 +230,7 @@ func (b *driverPluginServer) TaskStats(ctx context.Context, req *proto.TaskStats
return nil, err
}
pb, err := taskStatsToProto(stats)
pb, err := TaskStatsToProto(stats)
if err != nil {
return nil, fmt.Errorf("failed to encode task stats: %v", err)
}

View File

@@ -150,6 +150,7 @@ func (h *DriverHarness) MkAllocDir(t *drivers.TaskConfig, enableLogs bool) func(
return func() {
h.lm.Stop()
h.client.Close()
allocDir.Destroy()
}
}
@@ -158,6 +159,7 @@ func (h *DriverHarness) MkAllocDir(t *drivers.TaskConfig, enableLogs bool) func(
if h.lm != nil {
h.lm.Stop()
}
h.client.Close()
allocDir.Destroy()
}
}

View File

@@ -56,9 +56,9 @@ func taskConfigFromProto(pb *proto.TaskConfig) *TaskConfig {
Name: pb.Name,
Env: pb.Env,
rawDriverConfig: pb.MsgpackDriverConfig,
Resources: resourcesFromProto(pb.Resources),
Devices: devicesFromProto(pb.Devices),
Mounts: mountsFromProto(pb.Mounts),
Resources: ResourcesFromProto(pb.Resources),
Devices: DevicesFromProto(pb.Devices),
Mounts: MountsFromProto(pb.Mounts),
User: pb.User,
AllocDir: pb.AllocDir,
StdoutPath: pb.StdoutPath,
@@ -77,9 +77,9 @@ func taskConfigToProto(cfg *TaskConfig) *proto.TaskConfig {
TaskGroupName: cfg.TaskGroupName,
Name: cfg.Name,
Env: cfg.Env,
Resources: resourcesToProto(cfg.Resources),
Devices: devicesToProto(cfg.Devices),
Mounts: mountsToProto(cfg.Mounts),
Resources: ResourcesToProto(cfg.Resources),
Devices: DevicesToProto(cfg.Devices),
Mounts: MountsToProto(cfg.Mounts),
User: cfg.User,
AllocDir: cfg.AllocDir,
MsgpackDriverConfig: cfg.rawDriverConfig,
@@ -90,7 +90,7 @@ func taskConfigToProto(cfg *TaskConfig) *proto.TaskConfig {
return pb
}
func resourcesFromProto(pb *proto.Resources) *Resources {
func ResourcesFromProto(pb *proto.Resources) *Resources {
var r Resources
if pb == nil {
return &r
@@ -141,7 +141,7 @@ func resourcesFromProto(pb *proto.Resources) *Resources {
return &r
}
func resourcesToProto(r *Resources) *proto.Resources {
func ResourcesToProto(r *Resources) *proto.Resources {
if r == nil {
return nil
}
@@ -193,20 +193,20 @@ func resourcesToProto(r *Resources) *proto.Resources {
return &pb
}
func devicesFromProto(devices []*proto.Device) []*DeviceConfig {
func DevicesFromProto(devices []*proto.Device) []*DeviceConfig {
if devices == nil {
return nil
}
out := make([]*DeviceConfig, len(devices))
for i, d := range devices {
out[i] = deviceFromProto(d)
out[i] = DeviceFromProto(d)
}
return out
}
func deviceFromProto(device *proto.Device) *DeviceConfig {
func DeviceFromProto(device *proto.Device) *DeviceConfig {
if device == nil {
return nil
}
@@ -218,20 +218,20 @@ func deviceFromProto(device *proto.Device) *DeviceConfig {
}
}
func mountsFromProto(mounts []*proto.Mount) []*MountConfig {
func MountsFromProto(mounts []*proto.Mount) []*MountConfig {
if mounts == nil {
return nil
}
out := make([]*MountConfig, len(mounts))
for i, m := range mounts {
out[i] = mountFromProto(m)
out[i] = MountFromProto(m)
}
return out
}
func mountFromProto(mount *proto.Mount) *MountConfig {
func MountFromProto(mount *proto.Mount) *MountConfig {
if mount == nil {
return nil
}
@@ -243,20 +243,20 @@ func mountFromProto(mount *proto.Mount) *MountConfig {
}
}
func devicesToProto(devices []*DeviceConfig) []*proto.Device {
func DevicesToProto(devices []*DeviceConfig) []*proto.Device {
if devices == nil {
return nil
}
out := make([]*proto.Device, len(devices))
for i, d := range devices {
out[i] = deviceToProto(d)
out[i] = DeviceToProto(d)
}
return out
}
func deviceToProto(device *DeviceConfig) *proto.Device {
func DeviceToProto(device *DeviceConfig) *proto.Device {
if device == nil {
return nil
}
@@ -268,20 +268,20 @@ func deviceToProto(device *DeviceConfig) *proto.Device {
}
}
func mountsToProto(mounts []*MountConfig) []*proto.Mount {
func MountsToProto(mounts []*MountConfig) []*proto.Mount {
if mounts == nil {
return nil
}
out := make([]*proto.Mount, len(mounts))
for i, m := range mounts {
out[i] = mountToProto(m)
out[i] = MountToProto(m)
}
return out
}
func mountToProto(mount *MountConfig) *proto.Mount {
func MountToProto(mount *MountConfig) *proto.Mount {
if mount == nil {
return nil
}
@@ -371,7 +371,7 @@ func taskStatusFromProto(pb *proto.TaskStatus) (*TaskStatus, error) {
}, nil
}
func taskStatsToProto(stats *cstructs.TaskResourceUsage) (*proto.TaskStats, error) {
func TaskStatsToProto(stats *cstructs.TaskResourceUsage) (*proto.TaskStats, error) {
timestamp, err := ptypes.TimestampProto(time.Unix(0, stats.Timestamp))
if err != nil {
return nil, err
@@ -389,7 +389,7 @@ func taskStatsToProto(stats *cstructs.TaskResourceUsage) (*proto.TaskStats, erro
}, nil
}
func taskStatsFromProto(pb *proto.TaskStats) (*cstructs.TaskResourceUsage, error) {
func TaskStatsFromProto(pb *proto.TaskStats) (*cstructs.TaskResourceUsage, error) {
timestamp, err := ptypes.Timestamp(pb.Timestamp)
if err != nil {
return nil, err

View File

@@ -1,34 +1,13 @@
package utils
import (
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"strings"
hclog "github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/drivers/shared/executor"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
pexecutor "github.com/hashicorp/nomad/plugins/executor"
)
const (
// ExecutorDefaultMaxPort is the default max port used by the executor for
// searching for an available port
ExecutorDefaultMaxPort = 14512
// ExecutorDefaultMinPort is the default min port used by the executor for
// searching for an available port
ExecutorDefaultMinPort = 14000
)
// SetEnvvars sets path and host env vars depending on the FS isolation used.
@@ -60,85 +39,3 @@ func CgroupsMounted(node *structs.Node) bool {
_, ok := node.Attributes["unique.cgroup.mountpoint"]
return ok
}
// CreateExecutor launches an executor plugin and returns an instance of the
// Executor interface
func CreateExecutor(w io.Writer, level hclog.Level, driverConfig *base.ClientDriverConfig,
executorConfig *pexecutor.ExecutorConfig) (executor.Executor, *plugin.Client, error) {
c, err := json.Marshal(executorConfig)
if err != nil {
return nil, nil, fmt.Errorf("unable to create executor config: %v", err)
}
bin, err := discover.NomadExecutable()
if err != nil {
return nil, nil, fmt.Errorf("unable to find the nomad binary: %v", err)
}
config := &plugin.ClientConfig{
Cmd: exec.Command(bin, "executor", string(c)),
}
config.HandshakeConfig = base.Handshake
config.Plugins = pexecutor.GetPluginMap(w, level, executorConfig.FSIsolation)
if driverConfig != nil {
config.MaxPort = driverConfig.ClientMaxPort
config.MinPort = driverConfig.ClientMinPort
} else {
config.MaxPort = ExecutorDefaultMaxPort
config.MinPort = ExecutorDefaultMinPort
}
// setting the setsid of the plugin process so that it doesn't get signals sent to
// the nomad client.
if config.Cmd != nil {
isolateCommand(config.Cmd)
}
executorClient := plugin.NewClient(config)
rpcClient, err := executorClient.Client()
if err != nil {
return nil, nil, fmt.Errorf("error creating rpc client for executor plugin: %v", err)
}
raw, err := rpcClient.Dispense("executor")
if err != nil {
return nil, nil, fmt.Errorf("unable to dispense the executor plugin: %v", err)
}
executorPlugin := raw.(executor.Executor)
return executorPlugin, executorClient, nil
}
// CreateExecutorWithConfig launches a plugin with a given plugin config
func CreateExecutorWithConfig(config *plugin.ClientConfig, w io.Writer) (executor.Executor, *plugin.Client, error) {
config.HandshakeConfig = base.Handshake
// Setting this to DEBUG since the log level at the executor server process
// is already set, and this effects only the executor client.
config.Plugins = pexecutor.GetPluginMap(w, hclog.Debug, false)
executorClient := plugin.NewClient(config)
rpcClient, err := executorClient.Client()
if err != nil {
return nil, nil, fmt.Errorf("error creating rpc client for executor plugin: %v", err)
}
raw, err := rpcClient.Dispense("executor")
if err != nil {
return nil, nil, fmt.Errorf("unable to dispense the executor plugin: %v", err)
}
executorPlugin, ok := raw.(*pexecutor.ExecutorRPC)
if !ok {
return nil, nil, fmt.Errorf("unexpected executor rpc type: %T", raw)
}
return executorPlugin, executorClient, nil
}
// KillProcess kills a process with the given pid
func KillProcess(pid int) error {
proc, err := os.FindProcess(pid)
if err != nil {
return err
}
return proc.Kill()
}

View File

@@ -3,22 +3,9 @@
package utils
import (
"os/exec"
"syscall"
"golang.org/x/sys/unix"
)
// isolateCommand sets the setsid flag in exec.Cmd to true so that the process
// becomes the process leader in a new session and doesn't receive signals that
// are sent to the parent process.
func isolateCommand(cmd *exec.Cmd) {
if cmd.SysProcAttr == nil {
cmd.SysProcAttr = &syscall.SysProcAttr{}
}
cmd.SysProcAttr.Setsid = true
}
// IsUnixRoot returns true if system is unix and user running is effectively root
func IsUnixRoot() bool {
return unix.Geteuid() == 0

View File

@@ -1,12 +1,5 @@
package utils
import (
"os/exec"
)
// TODO Figure out if this is needed in Windows
func isolateCommand(cmd *exec.Cmd) {}
// IsUnixRoot returns true if system is a unix system and the effective uid of user is root
func IsUnixRoot() bool {
return false

View File

@@ -1,185 +0,0 @@
package executor
import (
"encoding/gob"
"net/rpc"
"os"
"syscall"
"time"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/shared/executor"
)
// Registering these types since we have to serialize and de-serialize the Task
// structs over the wire between drivers and the executor.
func init() {
gob.Register([]interface{}{})
gob.Register(map[string]interface{}{})
gob.Register([]map[string]string{})
gob.Register([]map[string]int{})
gob.Register(syscall.Signal(0x1))
}
type ExecutorRPC struct {
client *rpc.Client
logger hclog.Logger
}
// LaunchCmdArgs wraps a user command and the args for the purposes of RPC
type LaunchArgs struct {
Cmd *executor.ExecCommand
}
// ShutdownArgs wraps shutdown signal and grace period
type ShutdownArgs struct {
Signal string
GracePeriod time.Duration
}
type ExecArgs struct {
Deadline time.Time
Name string
Args []string
}
type ExecReturn struct {
Output []byte
Code int
}
func (e *ExecutorRPC) Launch(cmd *executor.ExecCommand) (*executor.ProcessState, error) {
var ps *executor.ProcessState
err := e.client.Call("Plugin.Launch", LaunchArgs{Cmd: cmd}, &ps)
return ps, err
}
func (e *ExecutorRPC) Wait() (*executor.ProcessState, error) {
var ps executor.ProcessState
err := e.client.Call("Plugin.Wait", new(interface{}), &ps)
return &ps, err
}
func (e *ExecutorRPC) Kill() error {
return e.client.Call("Plugin.Kill", new(interface{}), new(interface{}))
}
func (e *ExecutorRPC) Shutdown(signal string, grace time.Duration) error {
return e.client.Call("Plugin.Shutdown", &ShutdownArgs{signal, grace}, new(interface{}))
}
func (e *ExecutorRPC) UpdateResources(resources *executor.Resources) error {
return e.client.Call("Plugin.UpdateResources", resources, new(interface{}))
}
func (e *ExecutorRPC) Version() (*executor.ExecutorVersion, error) {
var version executor.ExecutorVersion
err := e.client.Call("Plugin.Version", new(interface{}), &version)
return &version, err
}
func (e *ExecutorRPC) Stats() (*cstructs.TaskResourceUsage, error) {
var resourceUsage cstructs.TaskResourceUsage
err := e.client.Call("Plugin.Stats", new(interface{}), &resourceUsage)
return &resourceUsage, err
}
func (e *ExecutorRPC) Signal(s os.Signal) error {
return e.client.Call("Plugin.Signal", &s, new(interface{}))
}
func (e *ExecutorRPC) Exec(deadline time.Time, name string, args []string) ([]byte, int, error) {
req := ExecArgs{
Deadline: deadline,
Name: name,
Args: args,
}
var resp *ExecReturn
err := e.client.Call("Plugin.Exec", req, &resp)
if resp == nil {
return nil, 0, err
}
return resp.Output, resp.Code, err
}
type ExecutorRPCServer struct {
Impl executor.Executor
logger hclog.Logger
}
func (e *ExecutorRPCServer) Launch(args LaunchArgs, ps *executor.ProcessState) error {
state, err := e.Impl.Launch(args.Cmd)
if state != nil {
*ps = *state
}
return err
}
func (e *ExecutorRPCServer) Wait(args interface{}, ps *executor.ProcessState) error {
state, err := e.Impl.Wait()
if state != nil {
*ps = *state
}
return err
}
func (e *ExecutorRPCServer) Shutdown(args ShutdownArgs, resp *interface{}) error {
return e.Impl.Shutdown(args.Signal, args.GracePeriod)
}
func (e *ExecutorRPCServer) UpdateResources(args *executor.Resources, resp *interface{}) error {
return e.Impl.UpdateResources(args)
}
func (e *ExecutorRPCServer) Version(args interface{}, version *executor.ExecutorVersion) error {
ver, err := e.Impl.Version()
if ver != nil {
*version = *ver
}
return err
}
func (e *ExecutorRPCServer) Stats(args interface{}, resourceUsage *cstructs.TaskResourceUsage) error {
ru, err := e.Impl.Stats()
if ru != nil {
*resourceUsage = *ru
}
return err
}
func (e *ExecutorRPCServer) Signal(args os.Signal, resp *interface{}) error {
return e.Impl.Signal(args)
}
func (e *ExecutorRPCServer) Exec(args ExecArgs, result *ExecReturn) error {
out, code, err := e.Impl.Exec(args.Deadline, args.Name, args.Args)
ret := &ExecReturn{
Output: out,
Code: code,
}
*result = *ret
return err
}
type ExecutorPlugin struct {
logger hclog.Logger
fsIsolation bool
Impl *ExecutorRPCServer
}
func (p *ExecutorPlugin) Server(*plugin.MuxBroker) (interface{}, error) {
if p.Impl == nil {
if p.fsIsolation {
p.Impl = &ExecutorRPCServer{Impl: executor.NewExecutorWithIsolation(p.logger), logger: p.logger}
} else {
p.Impl = &ExecutorRPCServer{Impl: executor.NewExecutor(p.logger), logger: p.logger}
}
}
return p.Impl, nil
}
func (p *ExecutorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) {
return &ExecutorRPC{client: c, logger: p.logger}, nil
}