executor: use grpc instead of netrpc as plugin protocol

* Added protobuf spec for executor
 * Seperated executor structs into their own package
This commit is contained in:
Nick Ethier
2018-12-05 11:03:56 -05:00
parent cd8c5c55bd
commit 467930f650
17 changed files with 1911 additions and 379 deletions

View File

@@ -49,6 +49,7 @@ func (e *ExecutorPluginCommand) Run(args []string) int {
hclog.LevelFromString(executorConfig.LogLevel),
executorConfig.FSIsolation,
),
GRPCServer: plugin.DefaultGRPCServer,
})
return 0
}

View File

@@ -3,8 +3,6 @@ package executor
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
@@ -18,8 +16,8 @@ import (
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/lib/fifo"
"github.com/hashicorp/nomad/client/stats"
"github.com/hashicorp/nomad/drivers/shared/executor/structs"
shelpers "github.com/hashicorp/nomad/helper/stats"
"github.com/hashicorp/consul-template/signals"
@@ -41,164 +39,14 @@ var (
ExecutorBasicMeasuredCpuStats = []string{"System Mode", "User Mode", "Percent"}
)
// Executor is the interface which allows a driver to launch and supervise
// a process
type Executor interface {
// Launch a user process configured by the given ExecCommand
Launch(launchCmd *ExecCommand) (*ProcessState, error)
// Wait blocks until the process exits or an error occures
Wait() (*ProcessState, error)
// Shutdown will shutdown the executor by stopping the user process,
// cleaning up and resources created by the executor. The shutdown sequence
// will first send the given signal to the process. This defaults to "SIGINT"
// if not specified. The executor will then wait for the process to exit
// before cleaning up other resources. If the executor waits longer than the
// given grace period, the process is forcefully killed.
//
// To force kill the user process, gracePeriod can be set to 0.
Shutdown(signal string, gracePeriod time.Duration) error
// UpdateResources updates any resource isolation enforcement with new
// constraints if supported.
UpdateResources(*Resources) error
// Version returns the executor API version
Version() (*ExecutorVersion, error)
// Stats fetchs process usage stats for the executor and each pid if available
Stats() (*cstructs.TaskResourceUsage, error)
// Signal sends the given signal to the user process
Signal(os.Signal) error
// Exec executes the given command and args inside the executor context
// and returns the output and exit code.
Exec(deadline time.Time, cmd string, args []string) ([]byte, int, error)
}
// Resources describes the resource isolation required
type Resources struct {
CPU int
MemoryMB int
DiskMB int
IOPS int
}
// ExecCommand holds the user command, args, and other isolation related
// settings.
type ExecCommand struct {
// Cmd is the command that the user wants to run.
Cmd string
// Args is the args of the command that the user wants to run.
Args []string
// Resources defined by the task
Resources *Resources
// StdoutPath is the path the procoess stdout should be written to
StdoutPath string
stdout io.WriteCloser
// StderrPath is the path the procoess stderr should be written to
StderrPath string
stderr io.WriteCloser
// Env is the list of KEY=val pairs of environment variables to be set
Env []string
// User is the user which the executor uses to run the command.
User string
// TaskDir is the directory path on the host where for the task
TaskDir string
// ResourceLimits determines whether resource limits are enforced by the
// executor.
ResourceLimits bool
// Cgroup marks whether we put the process in a cgroup. Setting this field
// doesn't enforce resource limits. To enforce limits, set ResourceLimits.
// Using the cgroup does allow more precise cleanup of processes.
BasicProcessCgroup bool
}
type nopCloser struct {
io.Writer
}
func (nopCloser) Close() error { return nil }
// Stdout returns a writer for the configured file descriptor
func (c *ExecCommand) Stdout() (io.WriteCloser, error) {
if c.stdout == nil {
if c.StdoutPath != "" {
f, err := fifo.Open(c.StdoutPath)
if err != nil {
return nil, fmt.Errorf("failed to create stdout: %v", err)
}
c.stdout = f
} else {
c.stdout = nopCloser{ioutil.Discard}
}
}
return c.stdout, nil
}
// Stderr returns a writer for the configured file descriptor
func (c *ExecCommand) Stderr() (io.WriteCloser, error) {
if c.stderr == nil {
if c.StderrPath != "" {
f, err := fifo.Open(c.StderrPath)
if err != nil {
return nil, fmt.Errorf("failed to create stderr: %v", err)
}
c.stderr = f
} else {
c.stderr = nopCloser{ioutil.Discard}
}
}
return c.stderr, nil
}
func (c *ExecCommand) Close() {
stdout, err := c.Stdout()
if err == nil {
stdout.Close()
}
stderr, err := c.Stderr()
if err == nil {
stderr.Close()
}
}
// ProcessState holds information about the state of a user process.
type ProcessState struct {
Pid int
ExitCode int
Signal int
Time time.Time
}
// ExecutorVersion is the version of the executor
type ExecutorVersion struct {
Version string
}
func (v *ExecutorVersion) GoString() string {
return v.Version
}
// UniversalExecutor is an implementation of the Executor which launches and
// supervises processes. In addition to process supervision it provides resource
// and file system isolation
type UniversalExecutor struct {
childCmd exec.Cmd
commandCfg *ExecCommand
commandCfg *structs.ExecCommand
exitState *ProcessState
exitState *structs.ProcessState
processExited chan interface{}
// resConCtx is used to track and cleanup additional resources created by
@@ -214,7 +62,7 @@ type UniversalExecutor struct {
}
// NewExecutor returns an Executor
func NewExecutor(logger hclog.Logger) Executor {
func NewExecutor(logger hclog.Logger) structs.Executor {
logger = logger.Named("executor")
if err := shelpers.Init(); err != nil {
logger.Error("unable to initialize stats", "error", err)
@@ -230,13 +78,13 @@ func NewExecutor(logger hclog.Logger) Executor {
}
// Version returns the api version of the executor
func (e *UniversalExecutor) Version() (*ExecutorVersion, error) {
return &ExecutorVersion{Version: ExecutorVersionLatest}, nil
func (e *UniversalExecutor) Version() (*structs.ExecutorVersion, error) {
return &structs.ExecutorVersion{Version: ExecutorVersionLatest}, nil
}
// Launch launches the main process and returns its state. It also
// configures an applies isolation on certain platforms.
func (e *UniversalExecutor) Launch(command *ExecCommand) (*ProcessState, error) {
func (e *UniversalExecutor) Launch(command *structs.ExecCommand) (*structs.ProcessState, error) {
e.logger.Info("launching command", "command", command.Cmd, "args", strings.Join(command.Args, " "))
e.commandCfg = command
@@ -298,7 +146,7 @@ func (e *UniversalExecutor) Launch(command *ExecCommand) (*ProcessState, error)
go e.pidCollector.collectPids(e.processExited, getAllPids)
go e.wait()
return &ProcessState{Pid: e.childCmd.Process.Pid, ExitCode: -1, Time: time.Now()}, nil
return &structs.ProcessState{Pid: e.childCmd.Process.Pid, ExitCode: -1, Time: time.Now()}, nil
}
// Exec a command inside a container for exec and java drivers.
@@ -346,12 +194,16 @@ func ExecScript(ctx context.Context, dir string, env []string, attrs *syscall.Sy
}
// Wait waits until a process has exited and returns it's exitcode and errors
func (e *UniversalExecutor) Wait() (*ProcessState, error) {
<-e.processExited
return e.exitState, nil
func (e *UniversalExecutor) Wait(ctx context.Context) (*structs.ProcessState, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-e.processExited:
return e.exitState, nil
}
}
func (e *UniversalExecutor) UpdateResources(resources *Resources) error {
func (e *UniversalExecutor) UpdateResources(resources *structs.Resources) error {
return nil
}
@@ -360,7 +212,7 @@ func (e *UniversalExecutor) wait() {
pid := e.childCmd.Process.Pid
err := e.childCmd.Wait()
if err == nil {
e.exitState = &ProcessState{Pid: pid, ExitCode: 0, Time: time.Now()}
e.exitState = &structs.ProcessState{Pid: pid, ExitCode: 0, Time: time.Now()}
return
}
@@ -388,7 +240,7 @@ func (e *UniversalExecutor) wait() {
e.logger.Warn("unexpected Cmd.Wait() error type", "error", err)
}
e.exitState = &ProcessState{Pid: pid, ExitCode: exitCode, Signal: signal, Time: time.Now()}
e.exitState = &structs.ProcessState{Pid: pid, ExitCode: exitCode, Signal: signal, Time: time.Now()}
}
var (

View File

@@ -2,9 +2,12 @@
package executor
import hclog "github.com/hashicorp/go-hclog"
import (
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/drivers/shared/executor/structs"
)
func NewExecutorWithIsolation(logger hclog.Logger) Executor {
func NewExecutorWithIsolation(logger hclog.Logger) structs.Executor {
logger = logger.Named("executor")
logger.Error("isolation executor is not supported on this platform, using default")
return NewExecutor(logger)

View File

@@ -19,6 +19,7 @@ import (
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/stats"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/shared/executor/structs"
"github.com/hashicorp/nomad/helper/discover"
shelpers "github.com/hashicorp/nomad/helper/stats"
"github.com/hashicorp/nomad/helper/uuid"
@@ -61,7 +62,7 @@ func init() {
// LibcontainerExecutor implements an Executor with the runc/libcontainer api
type LibcontainerExecutor struct {
id string
command *ExecCommand
command *structs.ExecCommand
logger hclog.Logger
@@ -73,10 +74,10 @@ type LibcontainerExecutor struct {
container libcontainer.Container
userProc *libcontainer.Process
userProcExited chan interface{}
exitState *ProcessState
exitState *structs.ProcessState
}
func NewExecutorWithIsolation(logger hclog.Logger) Executor {
func NewExecutorWithIsolation(logger hclog.Logger) structs.Executor {
logger = logger.Named("isolated_executor")
if err := shelpers.Init(); err != nil {
logger.Error("unable to initialize stats", "error", err)
@@ -92,7 +93,7 @@ func NewExecutorWithIsolation(logger hclog.Logger) Executor {
}
// Launch creates a new container in libcontainer and starts a new process with it
func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, error) {
func (l *LibcontainerExecutor) Launch(command *structs.ExecCommand) (*structs.ProcessState, error) {
l.logger.Info("launching command", "command", command.Cmd, "args", strings.Join(command.Args, " "))
// Find the nomad executable to launch the executor process with
bin, err := discover.NomadExecutable()
@@ -101,7 +102,7 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro
}
if command.Resources == nil {
command.Resources = &Resources{}
command.Resources = &structs.Resources{}
}
l.command = command
@@ -206,7 +207,7 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro
go l.pidCollector.collectPids(l.userProcExited, l.getAllPids)
go l.wait()
return &ProcessState{
return &structs.ProcessState{
Pid: pid,
ExitCode: -1,
Time: time.Now(),
@@ -231,9 +232,13 @@ func (l *LibcontainerExecutor) getAllPids() (map[int]*nomadPid, error) {
}
// Wait waits until a process has exited and returns it's exitcode and errors
func (l *LibcontainerExecutor) Wait() (*ProcessState, error) {
<-l.userProcExited
return l.exitState, nil
func (l *LibcontainerExecutor) Wait(ctx context.Context) (*structs.ProcessState, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-l.userProcExited:
return l.exitState, nil
}
}
func (l *LibcontainerExecutor) wait() {
@@ -247,7 +252,7 @@ func (l *LibcontainerExecutor) wait() {
ps = exitErr.ProcessState
} else {
l.logger.Error("failed to call wait on user process", "error", err)
l.exitState = &ProcessState{Pid: 0, ExitCode: 0, Time: time.Now()}
l.exitState = &structs.ProcessState{Pid: 0, ExitCode: 0, Time: time.Now()}
return
}
}
@@ -265,7 +270,7 @@ func (l *LibcontainerExecutor) wait() {
}
}
l.exitState = &ProcessState{
l.exitState = &structs.ProcessState{
Pid: ps.Pid(),
ExitCode: exitCode,
Signal: signal,
@@ -327,13 +332,13 @@ func (l *LibcontainerExecutor) Shutdown(signal string, grace time.Duration) erro
}
// UpdateResources updates the resource isolation with new values to be enforced
func (l *LibcontainerExecutor) UpdateResources(resources *Resources) error {
func (l *LibcontainerExecutor) UpdateResources(resources *structs.Resources) error {
return nil
}
// Version returns the api version of the executor
func (l *LibcontainerExecutor) Version() (*ExecutorVersion, error) {
return &ExecutorVersion{Version: ExecutorVersionLatest}, nil
func (l *LibcontainerExecutor) Version() (*structs.ExecutorVersion, error) {
return &structs.ExecutorVersion{Version: ExecutorVersionLatest}, nil
}
// Stats returns the resource statistics for processes managed by the executor
@@ -453,7 +458,7 @@ func (l *LibcontainerExecutor) handleExecWait(ch chan *waitResult, process *libc
ch <- &waitResult{ps, err}
}
func configureCapabilities(cfg *lconfigs.Config, command *ExecCommand) {
func configureCapabilities(cfg *lconfigs.Config, command *structs.ExecCommand) {
// TODO: allow better control of these
cfg.Capabilities = &lconfigs.Capabilities{
Bounding: allCaps,
@@ -465,7 +470,7 @@ func configureCapabilities(cfg *lconfigs.Config, command *ExecCommand) {
}
func configureIsolation(cfg *lconfigs.Config, command *ExecCommand) {
func configureIsolation(cfg *lconfigs.Config, command *structs.ExecCommand) {
defaultMountFlags := syscall.MS_NOEXEC | syscall.MS_NOSUID | syscall.MS_NODEV
// set the new root directory for the container
@@ -531,7 +536,7 @@ func configureIsolation(cfg *lconfigs.Config, command *ExecCommand) {
}
}
func configureCgroups(cfg *lconfigs.Config, command *ExecCommand) error {
func configureCgroups(cfg *lconfigs.Config, command *structs.ExecCommand) error {
// If resources are not limited then manually create cgroups needed
if !command.ResourceLimits {
@@ -597,7 +602,7 @@ func configureBasicCgroups(cfg *lconfigs.Config) error {
return nil
}
func newLibcontainerConfig(command *ExecCommand) *lconfigs.Config {
func newLibcontainerConfig(command *structs.ExecCommand) *lconfigs.Config {
cfg := &lconfigs.Config{
Cgroups: &lconfigs.Cgroup{
Resources: &lconfigs.Resources{

View File

@@ -15,6 +15,7 @@ import (
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/drivers/shared/executor/structs"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
tu "github.com/hashicorp/nomad/testutil"
@@ -25,7 +26,7 @@ func init() {
executorFactories["LibcontainerExecutor"] = libcontainerFactory
}
func libcontainerFactory(l hclog.Logger) Executor {
func libcontainerFactory(l hclog.Logger) structs.Executor {
return NewExecutorWithIsolation(l)
}
@@ -33,7 +34,7 @@ func libcontainerFactory(l hclog.Logger) Executor {
// chroot. Use testExecutorContext if you don't need a chroot.
//
// The caller is responsible for calling AllocDir.Destroy() to cleanup.
func testExecutorCommandWithChroot(t *testing.T) (*ExecCommand, *allocdir.AllocDir) {
func testExecutorCommandWithChroot(t *testing.T) (*structs.ExecCommand, *allocdir.AllocDir) {
chrootEnv := map[string]string{
"/etc/ld.so.cache": "/etc/ld.so.cache",
"/etc/ld.so.conf": "/etc/ld.so.conf",
@@ -61,10 +62,10 @@ func testExecutorCommandWithChroot(t *testing.T) (*ExecCommand, *allocdir.AllocD
t.Fatalf("allocDir.NewTaskDir(%q) failed: %v", task.Name, err)
}
td := allocDir.TaskDirs[task.Name]
cmd := &ExecCommand{
cmd := &structs.ExecCommand{
Env: taskEnv.List(),
TaskDir: td.Dir,
Resources: &Resources{
Resources: &structs.Resources{
CPU: task.Resources.CPU,
MemoryMB: task.Resources.MemoryMB,
IOPS: task.Resources.IOPS,
@@ -143,7 +144,8 @@ ld.so.cache
ld.so.conf
ld.so.conf.d/`
tu.WaitForResult(func() (bool, error) {
output := execCmd.stdout.(*bufferCloser).String()
outWriter, _ := execCmd.GetWriters()
output := outWriter.(*bufferCloser).String()
act := strings.TrimSpace(string(output))
if act != expected {
return false, fmt.Errorf("Command output incorrectly: want %v; got %v", expected, act)
@@ -176,9 +178,10 @@ func TestExecutor_ClientCleanup(t *testing.T) {
require.NoError(executor.Shutdown("SIGINT", 100*time.Millisecond))
executor.Wait()
output := execCmd.stdout.(*bufferCloser).String()
outWriter, _ := execCmd.GetWriters()
output := outWriter.(*bufferCloser).String()
require.NotZero(len(output))
time.Sleep(2 * time.Second)
output1 := execCmd.stdout.(*bufferCloser).String()
output1 := outWriter.(*bufferCloser).String()
require.Equal(len(output), len(output1))
}

View File

@@ -11,6 +11,7 @@ import (
"testing"
"time"
"github.com/hashicorp/nomad/drivers/shared/executor/structs"
tu "github.com/hashicorp/nomad/testutil"
hclog "github.com/hashicorp/go-hclog"
@@ -22,8 +23,8 @@ import (
"github.com/stretchr/testify/require"
)
var executorFactories = map[string]func(hclog.Logger) Executor{}
var universalFactory = func(l hclog.Logger) Executor {
var executorFactories = map[string]func(hclog.Logger) structs.Executor{}
var universalFactory = func(l hclog.Logger) structs.Executor {
return NewExecutor(l)
}
@@ -34,7 +35,7 @@ func init() {
// testExecutorContext returns an ExecutorContext and AllocDir.
//
// The caller is responsible for calling AllocDir.Destroy() to cleanup.
func testExecutorCommand(t *testing.T) (*ExecCommand, *allocdir.AllocDir) {
func testExecutorCommand(t *testing.T) (*structs.ExecCommand, *allocdir.AllocDir) {
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
taskEnv := taskenv.NewBuilder(mock.Node(), alloc, task, "global").Build()
@@ -48,10 +49,10 @@ func testExecutorCommand(t *testing.T) (*ExecCommand, *allocdir.AllocDir) {
t.Fatalf("allocDir.NewTaskDir(%q) failed: %v", task.Name, err)
}
td := allocDir.TaskDirs[task.Name]
cmd := &ExecCommand{
cmd := &structs.ExecCommand{
Env: taskEnv.List(),
TaskDir: td.Dir,
Resources: &Resources{
Resources: &structs.Resources{
CPU: task.Resources.CPU,
MemoryMB: task.Resources.MemoryMB,
IOPS: task.Resources.IOPS,
@@ -68,9 +69,8 @@ type bufferCloser struct {
func (_ *bufferCloser) Close() error { return nil }
func configureTLogging(cmd *ExecCommand) (stdout bufferCloser, stderr bufferCloser) {
cmd.stdout = &stdout
cmd.stderr = &stderr
func configureTLogging(cmd *structs.ExecCommand) (stdout bufferCloser, stderr bufferCloser) {
cmd.SetWriters(&stdout, &stderr)
return
}
@@ -140,7 +140,8 @@ func TestExecutor_Start_Wait(pt *testing.T) {
expected := "hello world"
tu.WaitForResult(func() (bool, error) {
output := execCmd.stdout.(*bufferCloser).String()
outWriter, _ := execCmd.GetWriters()
output := outWriter.(*bufferCloser).String()
act := strings.TrimSpace(string(output))
if expected != act {
return false, fmt.Errorf("expected: '%s' actual: '%s'", expected, act)
@@ -207,7 +208,8 @@ func TestExecutor_Start_Kill(pt *testing.T) {
require.NoError(executor.Shutdown("SIGINT", 100*time.Millisecond))
time.Sleep(time.Duration(tu.TestMultiplier()*2) * time.Second)
output := execCmd.stdout.(*bufferCloser).String()
outWriter, _ := execCmd.GetWriters()
output := outWriter.(*bufferCloser).String()
expected := ""
act := strings.TrimSpace(string(output))
if act != expected {

View File

@@ -0,0 +1,177 @@
package structs
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"time"
"github.com/hashicorp/nomad/client/lib/fifo"
cstructs "github.com/hashicorp/nomad/client/structs"
)
// Executor is the interface which allows a driver to launch and supervise
// a process
type Executor interface {
// Launch a user process configured by the given ExecCommand
Launch(launchCmd *ExecCommand) (*ProcessState, error)
// Wait blocks until the process exits or an error occures
Wait(ctx context.Context) (*ProcessState, error)
// Shutdown will shutdown the executor by stopping the user process,
// cleaning up and resources created by the executor. The shutdown sequence
// will first send the given signal to the process. This defaults to "SIGINT"
// if not specified. The executor will then wait for the process to exit
// before cleaning up other resources. If the executor waits longer than the
// given grace period, the process is forcefully killed.
//
// To force kill the user process, gracePeriod can be set to 0.
Shutdown(signal string, gracePeriod time.Duration) error
// UpdateResources updates any resource isolation enforcement with new
// constraints if supported.
UpdateResources(*Resources) error
// Version returns the executor API version
Version() (*ExecutorVersion, error)
// Stats fetchs process usage stats for the executor and each pid if available
Stats() (*cstructs.TaskResourceUsage, error)
// Signal sends the given signal to the user process
Signal(os.Signal) error
// Exec executes the given command and args inside the executor context
// and returns the output and exit code.
Exec(deadline time.Time, cmd string, args []string) ([]byte, int, error)
}
// Resources describes the resource isolation required
type Resources struct {
CPU int
MemoryMB int
DiskMB int
IOPS int
}
// ExecCommand holds the user command, args, and other isolation related
// settings.
type ExecCommand struct {
// Cmd is the command that the user wants to run.
Cmd string
// Args is the args of the command that the user wants to run.
Args []string
// Resources defined by the task
Resources *Resources
// StdoutPath is the path the process stdout should be written to
StdoutPath string
stdout io.WriteCloser
// StderrPath is the path the process stderr should be written to
StderrPath string
stderr io.WriteCloser
// Env is the list of KEY=val pairs of environment variables to be set
Env []string
// User is the user which the executor uses to run the command.
User string
// TaskDir is the directory path on the host where for the task
TaskDir string
// ResourceLimits determines whether resource limits are enforced by the
// executor.
ResourceLimits bool
// Cgroup marks whether we put the process in a cgroup. Setting this field
// doesn't enforce resource limits. To enforce limits, set ResourceLimits.
// Using the cgroup does allow more precise cleanup of processes.
BasicProcessCgroup bool
}
// SetWriters sets the writer for the process stdout and stderr. This should
// not be used if writing to a file path such as a fifo file. SetStdoutWriter
// is mainly used for unit testing purposes.
func (c *ExecCommand) SetWriters(out io.WriteCloser, err io.WriteCloser) {
c.stdout = out
c.stderr = err
}
// GetWriters returns the unexported io.WriteCloser for the stdout and stderr
// handles. This is mainly used for unit testing purposes.
func (c *ExecCommand) GetWriters() (stdout io.WriteCloser, stderr io.WriteCloser) {
return c.stdout, c.stderr
}
type nopCloser struct {
io.Writer
}
func (nopCloser) Close() error { return nil }
// Stdout returns a writer for the configured file descriptor
func (c *ExecCommand) Stdout() (io.WriteCloser, error) {
if c.stdout == nil {
if c.StdoutPath != "" {
f, err := fifo.Open(c.StdoutPath)
if err != nil {
return nil, fmt.Errorf("failed to create stdout: %v", err)
}
c.stdout = f
} else {
c.stdout = nopCloser{ioutil.Discard}
}
}
return c.stdout, nil
}
// Stderr returns a writer for the configured file descriptor
func (c *ExecCommand) Stderr() (io.WriteCloser, error) {
if c.stderr == nil {
if c.StderrPath != "" {
f, err := fifo.Open(c.StderrPath)
if err != nil {
return nil, fmt.Errorf("failed to create stderr: %v", err)
}
c.stderr = f
} else {
c.stderr = nopCloser{ioutil.Discard}
}
}
return c.stderr, nil
}
func (c *ExecCommand) Close() {
stdout, err := c.Stdout()
if err == nil {
stdout.Close()
}
stderr, err := c.Stderr()
if err == nil {
stderr.Close()
}
}
// ProcessState holds information about the state of a user process.
type ProcessState struct {
Pid int
ExitCode int
Signal int
Time time.Time
}
// ExecutorVersion is the version of the executor
type ExecutorVersion struct {
Version string
}
func (v *ExecutorVersion) GoString() string {
return v.Version
}

View File

@@ -265,7 +265,7 @@ func (d *driverPluginClient) TaskStats(taskID string) (*cstructs.TaskResourceUsa
return nil, err
}
stats, err := taskStatsFromProto(resp.Stats)
stats, err := TaskStatsFromProto(resp.Stats)
if err != nil {
return nil, err
}

View File

@@ -230,7 +230,7 @@ func (b *driverPluginServer) TaskStats(ctx context.Context, req *proto.TaskStats
return nil, err
}
pb, err := taskStatsToProto(stats)
pb, err := TaskStatsToProto(stats)
if err != nil {
return nil, fmt.Errorf("failed to encode task stats: %v", err)
}

View File

@@ -373,7 +373,7 @@ func taskStatusFromProto(pb *proto.TaskStatus) (*TaskStatus, error) {
}, nil
}
func taskStatsToProto(stats *cstructs.TaskResourceUsage) (*proto.TaskStats, error) {
func TaskStatsToProto(stats *cstructs.TaskResourceUsage) (*proto.TaskStats, error) {
timestamp, err := ptypes.TimestampProto(time.Unix(0, stats.Timestamp))
if err != nil {
return nil, err
@@ -391,7 +391,7 @@ func taskStatsToProto(stats *cstructs.TaskResourceUsage) (*proto.TaskStats, erro
}, nil
}
func taskStatsFromProto(pb *proto.TaskStats) (*cstructs.TaskResourceUsage, error) {
func TaskStatsFromProto(pb *proto.TaskStats) (*cstructs.TaskResourceUsage, error) {
timestamp, err := ptypes.Timestamp(pb.Timestamp)
if err != nil {
return nil, err

View File

@@ -14,7 +14,7 @@ import (
"github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/drivers/shared/executor"
estructs "github.com/hashicorp/nomad/drivers/shared/executor/structs"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
@@ -64,7 +64,7 @@ func CgroupsMounted(node *structs.Node) bool {
// CreateExecutor launches an executor plugin and returns an instance of the
// Executor interface
func CreateExecutor(w io.Writer, level hclog.Level, driverConfig *base.ClientDriverConfig,
executorConfig *pexecutor.ExecutorConfig) (executor.Executor, *plugin.Client, error) {
executorConfig *pexecutor.ExecutorConfig) (estructs.Executor, *plugin.Client, error) {
c, err := json.Marshal(executorConfig)
if err != nil {
@@ -80,6 +80,7 @@ func CreateExecutor(w io.Writer, level hclog.Level, driverConfig *base.ClientDri
}
config.HandshakeConfig = base.Handshake
config.Plugins = pexecutor.GetPluginMap(w, level, executorConfig.FSIsolation)
config.AllowedProtocols = []plugin.Protocol{plugin.ProtocolGRPC}
if driverConfig != nil {
config.MaxPort = driverConfig.ClientMaxPort
@@ -105,17 +106,20 @@ func CreateExecutor(w io.Writer, level hclog.Level, driverConfig *base.ClientDri
if err != nil {
return nil, nil, fmt.Errorf("unable to dispense the executor plugin: %v", err)
}
executorPlugin := raw.(executor.Executor)
executorPlugin := raw.(estructs.Executor)
return executorPlugin, executorClient, nil
}
// CreateExecutorWithConfig launches a plugin with a given plugin config
func CreateExecutorWithConfig(config *plugin.ClientConfig, w io.Writer) (executor.Executor, *plugin.Client, error) {
func CreateExecutorWithConfig(config *plugin.ClientConfig, w io.Writer) (estructs.Executor, *plugin.Client, error) {
config.HandshakeConfig = base.Handshake
// Setting this to DEBUG since the log level at the executor server process
// is already set, and this effects only the executor client.
// TODO: Use versioned plugin map to support backwards compatability with
// existing pre-0.9 executors
config.Plugins = pexecutor.GetPluginMap(w, hclog.Debug, false)
config.AllowedProtocols = []plugin.Protocol{plugin.ProtocolGRPC}
executorClient := plugin.NewClient(config)
rpcClient, err := executorClient.Client()
@@ -127,7 +131,7 @@ func CreateExecutorWithConfig(config *plugin.ClientConfig, w io.Writer) (executo
if err != nil {
return nil, nil, fmt.Errorf("unable to dispense the executor plugin: %v", err)
}
executorPlugin, ok := raw.(*pexecutor.ExecutorRPC)
executorPlugin, ok := raw.(estructs.Executor)
if !ok {
return nil, nil, fmt.Errorf("unexpected executor rpc type: %T", raw)
}

145
plugins/executor/client.go Normal file
View File

@@ -0,0 +1,145 @@
package executor
import (
"context"
"os"
"time"
"github.com/LK4D4/joincontext"
"github.com/golang/protobuf/ptypes"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/shared/executor/structs"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/executor/proto"
)
var _ structs.Executor = (*grpcExecutorClient)(nil)
type grpcExecutorClient struct {
client proto.ExecutorClient
// doneCtx is close when the plugin exits
doneCtx context.Context
}
func (c *grpcExecutorClient) Launch(cmd *structs.ExecCommand) (*structs.ProcessState, error) {
ctx := context.Background()
req := &proto.LaunchRequest{
Cmd: cmd.Cmd,
Args: cmd.Args,
Resources: resourcesToProto(cmd.Resources),
StdoutPath: cmd.StdoutPath,
StderrPath: cmd.StderrPath,
Env: cmd.Env,
User: cmd.User,
TaskDir: cmd.TaskDir,
ResourceLimits: cmd.ResourceLimits,
BasicProcessCgroup: cmd.BasicProcessCgroup,
}
resp, err := c.client.Launch(ctx, req)
if err != nil {
return nil, err
}
ps, err := processStateFromProto(resp.Process)
if err != nil {
return nil, err
}
return ps, nil
}
func (c *grpcExecutorClient) Wait(ctx context.Context) (*structs.ProcessState, error) {
// Join the passed context and the shutdown context
ctx, _ = joincontext.Join(ctx, c.doneCtx)
resp, err := c.client.Wait(ctx, &proto.WaitRequest{})
if err != nil {
return nil, err
}
ps, err := processStateFromProto(resp.Process)
if err != nil {
return nil, err
}
return ps, nil
}
func (c *grpcExecutorClient) Shutdown(signal string, gracePeriod time.Duration) error {
ctx := context.Background()
req := &proto.ShutdownRequest{
Signal: signal,
GracePeriod: gracePeriod.Nanoseconds(),
}
if _, err := c.client.Shutdown(ctx, req); err != nil {
return err
}
return nil
}
func (c *grpcExecutorClient) UpdateResources(r *structs.Resources) error {
ctx := context.Background()
req := &proto.UpdateResourcesRequest{Resources: resourcesToProto(r)}
if _, err := c.client.UpdateResources(ctx, req); err != nil {
return err
}
return nil
}
func (c *grpcExecutorClient) Version() (*structs.ExecutorVersion, error) {
ctx := context.Background()
resp, err := c.client.Version(ctx, &proto.VersionRequest{})
if err != nil {
return nil, err
}
return &structs.ExecutorVersion{Version: resp.Version}, nil
}
func (c *grpcExecutorClient) Stats() (*cstructs.TaskResourceUsage, error) {
ctx := context.Background()
resp, err := c.client.Stats(ctx, &proto.StatsRequest{})
if err != nil {
return nil, err
}
stats, err := drivers.TaskStatsFromProto(resp.Stats)
if err != nil {
return nil, err
}
return stats, nil
}
func (c *grpcExecutorClient) Signal(s os.Signal) error {
ctx := context.Background()
req := &proto.SignalRequest{
Signal: s.String(),
}
if _, err := c.client.Signal(ctx, req); err != nil {
return err
}
return nil
}
func (c *grpcExecutorClient) Exec(deadline time.Time, cmd string, args []string) ([]byte, int, error) {
ctx := context.Background()
pbDeadline, err := ptypes.TimestampProto(deadline)
if err != nil {
return nil, 0, err
}
req := &proto.ExecRequest{
Deadline: pbDeadline,
Cmd: cmd,
Args: args,
}
resp, err := c.client.Exec(ctx, req)
if err != nil {
return nil, 0, err
}
return resp.Output, int(resp.ExitCode), nil
}

View File

@@ -1,185 +1,34 @@
package executor
import (
"encoding/gob"
"net/rpc"
"os"
"syscall"
"time"
"context"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/shared/executor"
"github.com/hashicorp/nomad/plugins/executor/proto"
"google.golang.org/grpc"
)
// Registering these types since we have to serialize and de-serialize the Task
// structs over the wire between drivers and the executor.
func init() {
gob.Register([]interface{}{})
gob.Register(map[string]interface{}{})
gob.Register([]map[string]string{})
gob.Register([]map[string]int{})
gob.Register(syscall.Signal(0x1))
}
type ExecutorRPC struct {
client *rpc.Client
logger hclog.Logger
}
// LaunchCmdArgs wraps a user command and the args for the purposes of RPC
type LaunchArgs struct {
Cmd *executor.ExecCommand
}
// ShutdownArgs wraps shutdown signal and grace period
type ShutdownArgs struct {
Signal string
GracePeriod time.Duration
}
type ExecArgs struct {
Deadline time.Time
Name string
Args []string
}
type ExecReturn struct {
Output []byte
Code int
}
func (e *ExecutorRPC) Launch(cmd *executor.ExecCommand) (*executor.ProcessState, error) {
var ps *executor.ProcessState
err := e.client.Call("Plugin.Launch", LaunchArgs{Cmd: cmd}, &ps)
return ps, err
}
func (e *ExecutorRPC) Wait() (*executor.ProcessState, error) {
var ps executor.ProcessState
err := e.client.Call("Plugin.Wait", new(interface{}), &ps)
return &ps, err
}
func (e *ExecutorRPC) Kill() error {
return e.client.Call("Plugin.Kill", new(interface{}), new(interface{}))
}
func (e *ExecutorRPC) Shutdown(signal string, grace time.Duration) error {
return e.client.Call("Plugin.Shutdown", &ShutdownArgs{signal, grace}, new(interface{}))
}
func (e *ExecutorRPC) UpdateResources(resources *executor.Resources) error {
return e.client.Call("Plugin.UpdateResources", resources, new(interface{}))
}
func (e *ExecutorRPC) Version() (*executor.ExecutorVersion, error) {
var version executor.ExecutorVersion
err := e.client.Call("Plugin.Version", new(interface{}), &version)
return &version, err
}
func (e *ExecutorRPC) Stats() (*cstructs.TaskResourceUsage, error) {
var resourceUsage cstructs.TaskResourceUsage
err := e.client.Call("Plugin.Stats", new(interface{}), &resourceUsage)
return &resourceUsage, err
}
func (e *ExecutorRPC) Signal(s os.Signal) error {
return e.client.Call("Plugin.Signal", &s, new(interface{}))
}
func (e *ExecutorRPC) Exec(deadline time.Time, name string, args []string) ([]byte, int, error) {
req := ExecArgs{
Deadline: deadline,
Name: name,
Args: args,
}
var resp *ExecReturn
err := e.client.Call("Plugin.Exec", req, &resp)
if resp == nil {
return nil, 0, err
}
return resp.Output, resp.Code, err
}
type ExecutorRPCServer struct {
Impl executor.Executor
logger hclog.Logger
}
func (e *ExecutorRPCServer) Launch(args LaunchArgs, ps *executor.ProcessState) error {
state, err := e.Impl.Launch(args.Cmd)
if state != nil {
*ps = *state
}
return err
}
func (e *ExecutorRPCServer) Wait(args interface{}, ps *executor.ProcessState) error {
state, err := e.Impl.Wait()
if state != nil {
*ps = *state
}
return err
}
func (e *ExecutorRPCServer) Shutdown(args ShutdownArgs, resp *interface{}) error {
return e.Impl.Shutdown(args.Signal, args.GracePeriod)
}
func (e *ExecutorRPCServer) UpdateResources(args *executor.Resources, resp *interface{}) error {
return e.Impl.UpdateResources(args)
}
func (e *ExecutorRPCServer) Version(args interface{}, version *executor.ExecutorVersion) error {
ver, err := e.Impl.Version()
if ver != nil {
*version = *ver
}
return err
}
func (e *ExecutorRPCServer) Stats(args interface{}, resourceUsage *cstructs.TaskResourceUsage) error {
ru, err := e.Impl.Stats()
if ru != nil {
*resourceUsage = *ru
}
return err
}
func (e *ExecutorRPCServer) Signal(args os.Signal, resp *interface{}) error {
return e.Impl.Signal(args)
}
func (e *ExecutorRPCServer) Exec(args ExecArgs, result *ExecReturn) error {
out, code, err := e.Impl.Exec(args.Deadline, args.Name, args.Args)
ret := &ExecReturn{
Output: out,
Code: code,
}
*result = *ret
return err
}
type ExecutorPlugin struct {
// TODO: support backwards compatability with pre 0.9 NetRPC plugin
plugin.NetRPCUnsupportedPlugin
logger hclog.Logger
fsIsolation bool
Impl *ExecutorRPCServer
}
func (p *ExecutorPlugin) Server(*plugin.MuxBroker) (interface{}, error) {
if p.Impl == nil {
if p.fsIsolation {
p.Impl = &ExecutorRPCServer{Impl: executor.NewExecutorWithIsolation(p.logger), logger: p.logger}
} else {
p.Impl = &ExecutorRPCServer{Impl: executor.NewExecutor(p.logger), logger: p.logger}
}
func (p *ExecutorPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error {
if p.fsIsolation {
proto.RegisterExecutorServer(s, &grpcExecutorServer{impl: executor.NewExecutorWithIsolation(p.logger)})
} else {
proto.RegisterExecutorServer(s, &grpcExecutorServer{impl: executor.NewExecutor(p.logger)})
}
return p.Impl, nil
return nil
}
func (p *ExecutorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) {
return &ExecutorRPC{client: c, logger: p.logger}, nil
func (p *ExecutorPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
return &grpcExecutorClient{
client: proto.NewExecutorClient(c),
doneCtx: ctx,
}, nil
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,96 @@
syntax = "proto3";
package hashicorp.nomad.plugins.executor.proto;
option go_package = "proto";
import "google/protobuf/timestamp.proto";
import "github.com/hashicorp/nomad/plugins/drivers/proto/driver.proto";
service Executor {
rpc Launch(LaunchRequest) returns (LaunchResponse) {}
rpc Wait(WaitRequest) returns (WaitResponse) {}
rpc Shutdown(ShutdownRequest) returns (ShutdownResponse) {}
rpc UpdateResources(UpdateResourcesRequest) returns (UpdateResourcesResponse) {}
rpc Version(VersionRequest) returns (VersionResponse) {}
rpc Stats(StatsRequest) returns (StatsResponse) {}
rpc Signal(SignalRequest) returns (SignalResponse) {}
rpc Exec(ExecRequest) returns (ExecResponse) {}
}
message LaunchRequest {
string cmd = 1;
repeated string args = 2;
Resources resources = 3;
string stdout_path = 4;
string stderr_path = 5;
repeated string env = 6;
string user = 7;
string task_dir = 8;
bool resource_limits = 9;
bool basic_process_cgroup = 10;
}
message LaunchResponse {
ProcessState process = 1;
}
message WaitRequest {}
message WaitResponse{
ProcessState process = 1;
}
message ShutdownRequest {
string signal = 1;
int64 grace_period = 2;
}
message ShutdownResponse {}
message UpdateResourcesRequest{
Resources resources = 1;
}
message UpdateResourcesResponse {}
message VersionRequest {}
message VersionResponse{
string version = 1;
}
message StatsRequest {}
message StatsResponse {
hashicorp.nomad.plugins.drivers.proto.TaskStats stats = 1;
}
message SignalRequest {
string signal = 1;
}
message SignalResponse {}
message ExecRequest {
google.protobuf.Timestamp deadline = 1;
string cmd = 2;
repeated string args = 3;
}
message ExecResponse {
bytes output = 1;
int32 exit_code = 2;
}
message Resources {
int32 cpu = 1;
int32 memoryMB = 2;
int32 diskMB = 3;
int32 iops = 4;
}
message ProcessState {
int32 pid = 1;
int32 exit_code = 2;
int32 signal = 3;
google.protobuf.Timestamp time = 4;
}

132
plugins/executor/server.go Normal file
View File

@@ -0,0 +1,132 @@
package executor
import (
"fmt"
"time"
"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/consul-template/signals"
"github.com/hashicorp/nomad/drivers/shared/executor/structs"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/executor/proto"
"golang.org/x/net/context"
)
type grpcExecutorServer struct {
impl structs.Executor
doneCtx context.Context
}
func (s *grpcExecutorServer) Launch(ctx context.Context, req *proto.LaunchRequest) (*proto.LaunchResponse, error) {
ps, err := s.impl.Launch(&structs.ExecCommand{
Cmd: req.Cmd,
Args: req.Args,
Resources: resourcesFromProto(req.Resources),
StdoutPath: req.StdoutPath,
StderrPath: req.StderrPath,
Env: req.Env,
User: req.User,
TaskDir: req.TaskDir,
ResourceLimits: req.ResourceLimits,
BasicProcessCgroup: req.BasicProcessCgroup,
})
if err != nil {
return nil, err
}
process, err := processStateToProto(ps)
if err != nil {
return nil, err
}
return &proto.LaunchResponse{
Process: process,
}, nil
}
func (s *grpcExecutorServer) Wait(ctx context.Context, req *proto.WaitRequest) (*proto.WaitResponse, error) {
ps, err := s.impl.Wait(ctx)
if err != nil {
return nil, err
}
process, err := processStateToProto(ps)
if err != nil {
return nil, err
}
return &proto.WaitResponse{
Process: process,
}, nil
}
func (s *grpcExecutorServer) Shutdown(ctx context.Context, req *proto.ShutdownRequest) (*proto.ShutdownResponse, error) {
if err := s.impl.Shutdown(req.Signal, time.Duration(req.GracePeriod)); err != nil {
return nil, err
}
return &proto.ShutdownResponse{}, nil
}
func (s *grpcExecutorServer) UpdateResources(ctx context.Context, req *proto.UpdateResourcesRequest) (*proto.UpdateResourcesResponse, error) {
if err := s.impl.UpdateResources(resourcesFromProto(req.Resources)); err != nil {
return nil, err
}
return &proto.UpdateResourcesResponse{}, nil
}
func (s *grpcExecutorServer) Version(context.Context, *proto.VersionRequest) (*proto.VersionResponse, error) {
v, err := s.impl.Version()
if err != nil {
return nil, err
}
return &proto.VersionResponse{
Version: v.Version,
}, nil
}
func (s *grpcExecutorServer) Stats(context.Context, *proto.StatsRequest) (*proto.StatsResponse, error) {
stats, err := s.impl.Stats()
if err != nil {
return nil, err
}
pbStats, err := drivers.TaskStatsToProto(stats)
if err != nil {
return nil, err
}
return &proto.StatsResponse{
Stats: pbStats,
}, nil
}
func (s *grpcExecutorServer) Signal(ctx context.Context, req *proto.SignalRequest) (*proto.SignalResponse, error) {
if sig, ok := signals.SignalLookup[req.Signal]; ok {
if err := s.impl.Signal(sig); err != nil {
return nil, err
}
return &proto.SignalResponse{}, nil
}
return nil, fmt.Errorf("invalid signal sent by client")
}
func (s *grpcExecutorServer) Exec(ctx context.Context, req *proto.ExecRequest) (*proto.ExecResponse, error) {
deadline, err := ptypes.Timestamp(req.Deadline)
if err != nil {
return nil, err
}
out, exit, err := s.impl.Exec(deadline, req.Cmd, req.Args)
if err != nil {
return nil, err
}
return &proto.ExecResponse{
Output: out,
ExitCode: int32(exit),
}, nil
}

62
plugins/executor/utils.go Normal file
View File

@@ -0,0 +1,62 @@
package executor
import (
"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/nomad/drivers/shared/executor/structs"
"github.com/hashicorp/nomad/plugins/executor/proto"
)
func processStateToProto(ps *structs.ProcessState) (*proto.ProcessState, error) {
timestamp, err := ptypes.TimestampProto(ps.Time)
if err != nil {
return nil, err
}
pb := &proto.ProcessState{
Pid: int32(ps.Pid),
ExitCode: int32(ps.ExitCode),
Signal: int32(ps.Signal),
Time: timestamp,
}
return pb, nil
}
func processStateFromProto(pb *proto.ProcessState) (*structs.ProcessState, error) {
timestamp, err := ptypes.Timestamp(pb.Time)
if err != nil {
return nil, err
}
return &structs.ProcessState{
Pid: int(pb.Pid),
ExitCode: int(pb.ExitCode),
Signal: int(pb.Signal),
Time: timestamp,
}, nil
}
func resourcesToProto(r *structs.Resources) *proto.Resources {
if r == nil {
return &proto.Resources{}
}
return &proto.Resources{
Cpu: int32(r.CPU),
MemoryMB: int32(r.MemoryMB),
DiskMB: int32(r.DiskMB),
Iops: int32(r.IOPS),
}
}
func resourcesFromProto(pb *proto.Resources) *structs.Resources {
if pb == nil {
return &structs.Resources{}
}
return &structs.Resources{
CPU: int(pb.Cpu),
MemoryMB: int(pb.MemoryMB),
DiskMB: int(pb.DiskMB),
IOPS: int(pb.Iops),
}
}