From 467930f650d123322df15f877607fd00a9fce820 Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Wed, 5 Dec 2018 11:03:56 -0500 Subject: [PATCH] executor: use grpc instead of netrpc as plugin protocol * Added protobuf spec for executor * Seperated executor structs into their own package --- command/executor_plugin.go | 1 + drivers/shared/executor/executor.go | 184 +-- drivers/shared/executor/executor_basic.go | 7 +- drivers/shared/executor/executor_linux.go | 41 +- .../shared/executor/executor_linux_test.go | 17 +- drivers/shared/executor/executor_test.go | 22 +- drivers/shared/executor/structs/structs.go | 177 +++ plugins/drivers/client.go | 2 +- plugins/drivers/server.go | 2 +- plugins/drivers/utils.go | 4 +- plugins/drivers/utils/utils.go | 14 +- plugins/executor/client.go | 145 ++ plugins/executor/executor_plugin.go | 183 +-- plugins/executor/proto/executor.pb.go | 1201 +++++++++++++++++ plugins/executor/proto/executor.proto | 96 ++ plugins/executor/server.go | 132 ++ plugins/executor/utils.go | 62 + 17 files changed, 1911 insertions(+), 379 deletions(-) create mode 100644 drivers/shared/executor/structs/structs.go create mode 100644 plugins/executor/client.go create mode 100644 plugins/executor/proto/executor.pb.go create mode 100644 plugins/executor/proto/executor.proto create mode 100644 plugins/executor/server.go create mode 100644 plugins/executor/utils.go diff --git a/command/executor_plugin.go b/command/executor_plugin.go index e9741fb90..666d1fb47 100644 --- a/command/executor_plugin.go +++ b/command/executor_plugin.go @@ -49,6 +49,7 @@ func (e *ExecutorPluginCommand) Run(args []string) int { hclog.LevelFromString(executorConfig.LogLevel), executorConfig.FSIsolation, ), + GRPCServer: plugin.DefaultGRPCServer, }) return 0 } diff --git a/drivers/shared/executor/executor.go b/drivers/shared/executor/executor.go index 7239fd577..cb34ce3bd 100644 --- a/drivers/shared/executor/executor.go +++ b/drivers/shared/executor/executor.go @@ -3,8 +3,6 @@ package executor import ( "context" "fmt" - "io" - "io/ioutil" "os" "os/exec" "path/filepath" @@ -18,8 +16,8 @@ import ( multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocdir" - "github.com/hashicorp/nomad/client/lib/fifo" "github.com/hashicorp/nomad/client/stats" + "github.com/hashicorp/nomad/drivers/shared/executor/structs" shelpers "github.com/hashicorp/nomad/helper/stats" "github.com/hashicorp/consul-template/signals" @@ -41,164 +39,14 @@ var ( ExecutorBasicMeasuredCpuStats = []string{"System Mode", "User Mode", "Percent"} ) -// Executor is the interface which allows a driver to launch and supervise -// a process -type Executor interface { - // Launch a user process configured by the given ExecCommand - Launch(launchCmd *ExecCommand) (*ProcessState, error) - - // Wait blocks until the process exits or an error occures - Wait() (*ProcessState, error) - - // Shutdown will shutdown the executor by stopping the user process, - // cleaning up and resources created by the executor. The shutdown sequence - // will first send the given signal to the process. This defaults to "SIGINT" - // if not specified. The executor will then wait for the process to exit - // before cleaning up other resources. If the executor waits longer than the - // given grace period, the process is forcefully killed. - // - // To force kill the user process, gracePeriod can be set to 0. - Shutdown(signal string, gracePeriod time.Duration) error - - // UpdateResources updates any resource isolation enforcement with new - // constraints if supported. - UpdateResources(*Resources) error - - // Version returns the executor API version - Version() (*ExecutorVersion, error) - - // Stats fetchs process usage stats for the executor and each pid if available - Stats() (*cstructs.TaskResourceUsage, error) - - // Signal sends the given signal to the user process - Signal(os.Signal) error - - // Exec executes the given command and args inside the executor context - // and returns the output and exit code. - Exec(deadline time.Time, cmd string, args []string) ([]byte, int, error) -} - -// Resources describes the resource isolation required -type Resources struct { - CPU int - MemoryMB int - DiskMB int - IOPS int -} - -// ExecCommand holds the user command, args, and other isolation related -// settings. -type ExecCommand struct { - // Cmd is the command that the user wants to run. - Cmd string - - // Args is the args of the command that the user wants to run. - Args []string - - // Resources defined by the task - Resources *Resources - - // StdoutPath is the path the procoess stdout should be written to - StdoutPath string - stdout io.WriteCloser - - // StderrPath is the path the procoess stderr should be written to - StderrPath string - stderr io.WriteCloser - - // Env is the list of KEY=val pairs of environment variables to be set - Env []string - - // User is the user which the executor uses to run the command. - User string - - // TaskDir is the directory path on the host where for the task - TaskDir string - - // ResourceLimits determines whether resource limits are enforced by the - // executor. - ResourceLimits bool - - // Cgroup marks whether we put the process in a cgroup. Setting this field - // doesn't enforce resource limits. To enforce limits, set ResourceLimits. - // Using the cgroup does allow more precise cleanup of processes. - BasicProcessCgroup bool -} - -type nopCloser struct { - io.Writer -} - -func (nopCloser) Close() error { return nil } - -// Stdout returns a writer for the configured file descriptor -func (c *ExecCommand) Stdout() (io.WriteCloser, error) { - if c.stdout == nil { - if c.StdoutPath != "" { - f, err := fifo.Open(c.StdoutPath) - if err != nil { - return nil, fmt.Errorf("failed to create stdout: %v", err) - } - c.stdout = f - } else { - c.stdout = nopCloser{ioutil.Discard} - } - } - return c.stdout, nil -} - -// Stderr returns a writer for the configured file descriptor -func (c *ExecCommand) Stderr() (io.WriteCloser, error) { - if c.stderr == nil { - if c.StderrPath != "" { - f, err := fifo.Open(c.StderrPath) - if err != nil { - return nil, fmt.Errorf("failed to create stderr: %v", err) - } - c.stderr = f - } else { - c.stderr = nopCloser{ioutil.Discard} - } - } - return c.stderr, nil -} - -func (c *ExecCommand) Close() { - stdout, err := c.Stdout() - if err == nil { - stdout.Close() - } - stderr, err := c.Stderr() - if err == nil { - stderr.Close() - } -} - -// ProcessState holds information about the state of a user process. -type ProcessState struct { - Pid int - ExitCode int - Signal int - Time time.Time -} - -// ExecutorVersion is the version of the executor -type ExecutorVersion struct { - Version string -} - -func (v *ExecutorVersion) GoString() string { - return v.Version -} - // UniversalExecutor is an implementation of the Executor which launches and // supervises processes. In addition to process supervision it provides resource // and file system isolation type UniversalExecutor struct { childCmd exec.Cmd - commandCfg *ExecCommand + commandCfg *structs.ExecCommand - exitState *ProcessState + exitState *structs.ProcessState processExited chan interface{} // resConCtx is used to track and cleanup additional resources created by @@ -214,7 +62,7 @@ type UniversalExecutor struct { } // NewExecutor returns an Executor -func NewExecutor(logger hclog.Logger) Executor { +func NewExecutor(logger hclog.Logger) structs.Executor { logger = logger.Named("executor") if err := shelpers.Init(); err != nil { logger.Error("unable to initialize stats", "error", err) @@ -230,13 +78,13 @@ func NewExecutor(logger hclog.Logger) Executor { } // Version returns the api version of the executor -func (e *UniversalExecutor) Version() (*ExecutorVersion, error) { - return &ExecutorVersion{Version: ExecutorVersionLatest}, nil +func (e *UniversalExecutor) Version() (*structs.ExecutorVersion, error) { + return &structs.ExecutorVersion{Version: ExecutorVersionLatest}, nil } // Launch launches the main process and returns its state. It also // configures an applies isolation on certain platforms. -func (e *UniversalExecutor) Launch(command *ExecCommand) (*ProcessState, error) { +func (e *UniversalExecutor) Launch(command *structs.ExecCommand) (*structs.ProcessState, error) { e.logger.Info("launching command", "command", command.Cmd, "args", strings.Join(command.Args, " ")) e.commandCfg = command @@ -298,7 +146,7 @@ func (e *UniversalExecutor) Launch(command *ExecCommand) (*ProcessState, error) go e.pidCollector.collectPids(e.processExited, getAllPids) go e.wait() - return &ProcessState{Pid: e.childCmd.Process.Pid, ExitCode: -1, Time: time.Now()}, nil + return &structs.ProcessState{Pid: e.childCmd.Process.Pid, ExitCode: -1, Time: time.Now()}, nil } // Exec a command inside a container for exec and java drivers. @@ -346,12 +194,16 @@ func ExecScript(ctx context.Context, dir string, env []string, attrs *syscall.Sy } // Wait waits until a process has exited and returns it's exitcode and errors -func (e *UniversalExecutor) Wait() (*ProcessState, error) { - <-e.processExited - return e.exitState, nil +func (e *UniversalExecutor) Wait(ctx context.Context) (*structs.ProcessState, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-e.processExited: + return e.exitState, nil + } } -func (e *UniversalExecutor) UpdateResources(resources *Resources) error { +func (e *UniversalExecutor) UpdateResources(resources *structs.Resources) error { return nil } @@ -360,7 +212,7 @@ func (e *UniversalExecutor) wait() { pid := e.childCmd.Process.Pid err := e.childCmd.Wait() if err == nil { - e.exitState = &ProcessState{Pid: pid, ExitCode: 0, Time: time.Now()} + e.exitState = &structs.ProcessState{Pid: pid, ExitCode: 0, Time: time.Now()} return } @@ -388,7 +240,7 @@ func (e *UniversalExecutor) wait() { e.logger.Warn("unexpected Cmd.Wait() error type", "error", err) } - e.exitState = &ProcessState{Pid: pid, ExitCode: exitCode, Signal: signal, Time: time.Now()} + e.exitState = &structs.ProcessState{Pid: pid, ExitCode: exitCode, Signal: signal, Time: time.Now()} } var ( diff --git a/drivers/shared/executor/executor_basic.go b/drivers/shared/executor/executor_basic.go index 1951ae584..ccd5172ff 100644 --- a/drivers/shared/executor/executor_basic.go +++ b/drivers/shared/executor/executor_basic.go @@ -2,9 +2,12 @@ package executor -import hclog "github.com/hashicorp/go-hclog" +import ( + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/drivers/shared/executor/structs" +) -func NewExecutorWithIsolation(logger hclog.Logger) Executor { +func NewExecutorWithIsolation(logger hclog.Logger) structs.Executor { logger = logger.Named("executor") logger.Error("isolation executor is not supported on this platform, using default") return NewExecutor(logger) diff --git a/drivers/shared/executor/executor_linux.go b/drivers/shared/executor/executor_linux.go index 989cea353..f5182d0c9 100644 --- a/drivers/shared/executor/executor_linux.go +++ b/drivers/shared/executor/executor_linux.go @@ -19,6 +19,7 @@ import ( multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/stats" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/drivers/shared/executor/structs" "github.com/hashicorp/nomad/helper/discover" shelpers "github.com/hashicorp/nomad/helper/stats" "github.com/hashicorp/nomad/helper/uuid" @@ -61,7 +62,7 @@ func init() { // LibcontainerExecutor implements an Executor with the runc/libcontainer api type LibcontainerExecutor struct { id string - command *ExecCommand + command *structs.ExecCommand logger hclog.Logger @@ -73,10 +74,10 @@ type LibcontainerExecutor struct { container libcontainer.Container userProc *libcontainer.Process userProcExited chan interface{} - exitState *ProcessState + exitState *structs.ProcessState } -func NewExecutorWithIsolation(logger hclog.Logger) Executor { +func NewExecutorWithIsolation(logger hclog.Logger) structs.Executor { logger = logger.Named("isolated_executor") if err := shelpers.Init(); err != nil { logger.Error("unable to initialize stats", "error", err) @@ -92,7 +93,7 @@ func NewExecutorWithIsolation(logger hclog.Logger) Executor { } // Launch creates a new container in libcontainer and starts a new process with it -func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, error) { +func (l *LibcontainerExecutor) Launch(command *structs.ExecCommand) (*structs.ProcessState, error) { l.logger.Info("launching command", "command", command.Cmd, "args", strings.Join(command.Args, " ")) // Find the nomad executable to launch the executor process with bin, err := discover.NomadExecutable() @@ -101,7 +102,7 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro } if command.Resources == nil { - command.Resources = &Resources{} + command.Resources = &structs.Resources{} } l.command = command @@ -206,7 +207,7 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro go l.pidCollector.collectPids(l.userProcExited, l.getAllPids) go l.wait() - return &ProcessState{ + return &structs.ProcessState{ Pid: pid, ExitCode: -1, Time: time.Now(), @@ -231,9 +232,13 @@ func (l *LibcontainerExecutor) getAllPids() (map[int]*nomadPid, error) { } // Wait waits until a process has exited and returns it's exitcode and errors -func (l *LibcontainerExecutor) Wait() (*ProcessState, error) { - <-l.userProcExited - return l.exitState, nil +func (l *LibcontainerExecutor) Wait(ctx context.Context) (*structs.ProcessState, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-l.userProcExited: + return l.exitState, nil + } } func (l *LibcontainerExecutor) wait() { @@ -247,7 +252,7 @@ func (l *LibcontainerExecutor) wait() { ps = exitErr.ProcessState } else { l.logger.Error("failed to call wait on user process", "error", err) - l.exitState = &ProcessState{Pid: 0, ExitCode: 0, Time: time.Now()} + l.exitState = &structs.ProcessState{Pid: 0, ExitCode: 0, Time: time.Now()} return } } @@ -265,7 +270,7 @@ func (l *LibcontainerExecutor) wait() { } } - l.exitState = &ProcessState{ + l.exitState = &structs.ProcessState{ Pid: ps.Pid(), ExitCode: exitCode, Signal: signal, @@ -327,13 +332,13 @@ func (l *LibcontainerExecutor) Shutdown(signal string, grace time.Duration) erro } // UpdateResources updates the resource isolation with new values to be enforced -func (l *LibcontainerExecutor) UpdateResources(resources *Resources) error { +func (l *LibcontainerExecutor) UpdateResources(resources *structs.Resources) error { return nil } // Version returns the api version of the executor -func (l *LibcontainerExecutor) Version() (*ExecutorVersion, error) { - return &ExecutorVersion{Version: ExecutorVersionLatest}, nil +func (l *LibcontainerExecutor) Version() (*structs.ExecutorVersion, error) { + return &structs.ExecutorVersion{Version: ExecutorVersionLatest}, nil } // Stats returns the resource statistics for processes managed by the executor @@ -453,7 +458,7 @@ func (l *LibcontainerExecutor) handleExecWait(ch chan *waitResult, process *libc ch <- &waitResult{ps, err} } -func configureCapabilities(cfg *lconfigs.Config, command *ExecCommand) { +func configureCapabilities(cfg *lconfigs.Config, command *structs.ExecCommand) { // TODO: allow better control of these cfg.Capabilities = &lconfigs.Capabilities{ Bounding: allCaps, @@ -465,7 +470,7 @@ func configureCapabilities(cfg *lconfigs.Config, command *ExecCommand) { } -func configureIsolation(cfg *lconfigs.Config, command *ExecCommand) { +func configureIsolation(cfg *lconfigs.Config, command *structs.ExecCommand) { defaultMountFlags := syscall.MS_NOEXEC | syscall.MS_NOSUID | syscall.MS_NODEV // set the new root directory for the container @@ -531,7 +536,7 @@ func configureIsolation(cfg *lconfigs.Config, command *ExecCommand) { } } -func configureCgroups(cfg *lconfigs.Config, command *ExecCommand) error { +func configureCgroups(cfg *lconfigs.Config, command *structs.ExecCommand) error { // If resources are not limited then manually create cgroups needed if !command.ResourceLimits { @@ -597,7 +602,7 @@ func configureBasicCgroups(cfg *lconfigs.Config) error { return nil } -func newLibcontainerConfig(command *ExecCommand) *lconfigs.Config { +func newLibcontainerConfig(command *structs.ExecCommand) *lconfigs.Config { cfg := &lconfigs.Config{ Cgroups: &lconfigs.Cgroup{ Resources: &lconfigs.Resources{ diff --git a/drivers/shared/executor/executor_linux_test.go b/drivers/shared/executor/executor_linux_test.go index de7bf54f4..3b8219396 100644 --- a/drivers/shared/executor/executor_linux_test.go +++ b/drivers/shared/executor/executor_linux_test.go @@ -15,6 +15,7 @@ import ( cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/client/testutil" + "github.com/hashicorp/nomad/drivers/shared/executor/structs" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" tu "github.com/hashicorp/nomad/testutil" @@ -25,7 +26,7 @@ func init() { executorFactories["LibcontainerExecutor"] = libcontainerFactory } -func libcontainerFactory(l hclog.Logger) Executor { +func libcontainerFactory(l hclog.Logger) structs.Executor { return NewExecutorWithIsolation(l) } @@ -33,7 +34,7 @@ func libcontainerFactory(l hclog.Logger) Executor { // chroot. Use testExecutorContext if you don't need a chroot. // // The caller is responsible for calling AllocDir.Destroy() to cleanup. -func testExecutorCommandWithChroot(t *testing.T) (*ExecCommand, *allocdir.AllocDir) { +func testExecutorCommandWithChroot(t *testing.T) (*structs.ExecCommand, *allocdir.AllocDir) { chrootEnv := map[string]string{ "/etc/ld.so.cache": "/etc/ld.so.cache", "/etc/ld.so.conf": "/etc/ld.so.conf", @@ -61,10 +62,10 @@ func testExecutorCommandWithChroot(t *testing.T) (*ExecCommand, *allocdir.AllocD t.Fatalf("allocDir.NewTaskDir(%q) failed: %v", task.Name, err) } td := allocDir.TaskDirs[task.Name] - cmd := &ExecCommand{ + cmd := &structs.ExecCommand{ Env: taskEnv.List(), TaskDir: td.Dir, - Resources: &Resources{ + Resources: &structs.Resources{ CPU: task.Resources.CPU, MemoryMB: task.Resources.MemoryMB, IOPS: task.Resources.IOPS, @@ -143,7 +144,8 @@ ld.so.cache ld.so.conf ld.so.conf.d/` tu.WaitForResult(func() (bool, error) { - output := execCmd.stdout.(*bufferCloser).String() + outWriter, _ := execCmd.GetWriters() + output := outWriter.(*bufferCloser).String() act := strings.TrimSpace(string(output)) if act != expected { return false, fmt.Errorf("Command output incorrectly: want %v; got %v", expected, act) @@ -176,9 +178,10 @@ func TestExecutor_ClientCleanup(t *testing.T) { require.NoError(executor.Shutdown("SIGINT", 100*time.Millisecond)) executor.Wait() - output := execCmd.stdout.(*bufferCloser).String() + outWriter, _ := execCmd.GetWriters() + output := outWriter.(*bufferCloser).String() require.NotZero(len(output)) time.Sleep(2 * time.Second) - output1 := execCmd.stdout.(*bufferCloser).String() + output1 := outWriter.(*bufferCloser).String() require.Equal(len(output), len(output1)) } diff --git a/drivers/shared/executor/executor_test.go b/drivers/shared/executor/executor_test.go index 1c274a382..109ae6641 100644 --- a/drivers/shared/executor/executor_test.go +++ b/drivers/shared/executor/executor_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/hashicorp/nomad/drivers/shared/executor/structs" tu "github.com/hashicorp/nomad/testutil" hclog "github.com/hashicorp/go-hclog" @@ -22,8 +23,8 @@ import ( "github.com/stretchr/testify/require" ) -var executorFactories = map[string]func(hclog.Logger) Executor{} -var universalFactory = func(l hclog.Logger) Executor { +var executorFactories = map[string]func(hclog.Logger) structs.Executor{} +var universalFactory = func(l hclog.Logger) structs.Executor { return NewExecutor(l) } @@ -34,7 +35,7 @@ func init() { // testExecutorContext returns an ExecutorContext and AllocDir. // // The caller is responsible for calling AllocDir.Destroy() to cleanup. -func testExecutorCommand(t *testing.T) (*ExecCommand, *allocdir.AllocDir) { +func testExecutorCommand(t *testing.T) (*structs.ExecCommand, *allocdir.AllocDir) { alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] taskEnv := taskenv.NewBuilder(mock.Node(), alloc, task, "global").Build() @@ -48,10 +49,10 @@ func testExecutorCommand(t *testing.T) (*ExecCommand, *allocdir.AllocDir) { t.Fatalf("allocDir.NewTaskDir(%q) failed: %v", task.Name, err) } td := allocDir.TaskDirs[task.Name] - cmd := &ExecCommand{ + cmd := &structs.ExecCommand{ Env: taskEnv.List(), TaskDir: td.Dir, - Resources: &Resources{ + Resources: &structs.Resources{ CPU: task.Resources.CPU, MemoryMB: task.Resources.MemoryMB, IOPS: task.Resources.IOPS, @@ -68,9 +69,8 @@ type bufferCloser struct { func (_ *bufferCloser) Close() error { return nil } -func configureTLogging(cmd *ExecCommand) (stdout bufferCloser, stderr bufferCloser) { - cmd.stdout = &stdout - cmd.stderr = &stderr +func configureTLogging(cmd *structs.ExecCommand) (stdout bufferCloser, stderr bufferCloser) { + cmd.SetWriters(&stdout, &stderr) return } @@ -140,7 +140,8 @@ func TestExecutor_Start_Wait(pt *testing.T) { expected := "hello world" tu.WaitForResult(func() (bool, error) { - output := execCmd.stdout.(*bufferCloser).String() + outWriter, _ := execCmd.GetWriters() + output := outWriter.(*bufferCloser).String() act := strings.TrimSpace(string(output)) if expected != act { return false, fmt.Errorf("expected: '%s' actual: '%s'", expected, act) @@ -207,7 +208,8 @@ func TestExecutor_Start_Kill(pt *testing.T) { require.NoError(executor.Shutdown("SIGINT", 100*time.Millisecond)) time.Sleep(time.Duration(tu.TestMultiplier()*2) * time.Second) - output := execCmd.stdout.(*bufferCloser).String() + outWriter, _ := execCmd.GetWriters() + output := outWriter.(*bufferCloser).String() expected := "" act := strings.TrimSpace(string(output)) if act != expected { diff --git a/drivers/shared/executor/structs/structs.go b/drivers/shared/executor/structs/structs.go new file mode 100644 index 000000000..56495f7ab --- /dev/null +++ b/drivers/shared/executor/structs/structs.go @@ -0,0 +1,177 @@ +package structs + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "os" + "time" + + "github.com/hashicorp/nomad/client/lib/fifo" + cstructs "github.com/hashicorp/nomad/client/structs" +) + +// Executor is the interface which allows a driver to launch and supervise +// a process +type Executor interface { + // Launch a user process configured by the given ExecCommand + Launch(launchCmd *ExecCommand) (*ProcessState, error) + + // Wait blocks until the process exits or an error occures + Wait(ctx context.Context) (*ProcessState, error) + + // Shutdown will shutdown the executor by stopping the user process, + // cleaning up and resources created by the executor. The shutdown sequence + // will first send the given signal to the process. This defaults to "SIGINT" + // if not specified. The executor will then wait for the process to exit + // before cleaning up other resources. If the executor waits longer than the + // given grace period, the process is forcefully killed. + // + // To force kill the user process, gracePeriod can be set to 0. + Shutdown(signal string, gracePeriod time.Duration) error + + // UpdateResources updates any resource isolation enforcement with new + // constraints if supported. + UpdateResources(*Resources) error + + // Version returns the executor API version + Version() (*ExecutorVersion, error) + + // Stats fetchs process usage stats for the executor and each pid if available + Stats() (*cstructs.TaskResourceUsage, error) + + // Signal sends the given signal to the user process + Signal(os.Signal) error + + // Exec executes the given command and args inside the executor context + // and returns the output and exit code. + Exec(deadline time.Time, cmd string, args []string) ([]byte, int, error) +} + +// Resources describes the resource isolation required +type Resources struct { + CPU int + MemoryMB int + DiskMB int + IOPS int +} + +// ExecCommand holds the user command, args, and other isolation related +// settings. +type ExecCommand struct { + // Cmd is the command that the user wants to run. + Cmd string + + // Args is the args of the command that the user wants to run. + Args []string + + // Resources defined by the task + Resources *Resources + + // StdoutPath is the path the process stdout should be written to + StdoutPath string + stdout io.WriteCloser + + // StderrPath is the path the process stderr should be written to + StderrPath string + stderr io.WriteCloser + + // Env is the list of KEY=val pairs of environment variables to be set + Env []string + + // User is the user which the executor uses to run the command. + User string + + // TaskDir is the directory path on the host where for the task + TaskDir string + + // ResourceLimits determines whether resource limits are enforced by the + // executor. + ResourceLimits bool + + // Cgroup marks whether we put the process in a cgroup. Setting this field + // doesn't enforce resource limits. To enforce limits, set ResourceLimits. + // Using the cgroup does allow more precise cleanup of processes. + BasicProcessCgroup bool +} + +// SetWriters sets the writer for the process stdout and stderr. This should +// not be used if writing to a file path such as a fifo file. SetStdoutWriter +// is mainly used for unit testing purposes. +func (c *ExecCommand) SetWriters(out io.WriteCloser, err io.WriteCloser) { + c.stdout = out + c.stderr = err +} + +// GetWriters returns the unexported io.WriteCloser for the stdout and stderr +// handles. This is mainly used for unit testing purposes. +func (c *ExecCommand) GetWriters() (stdout io.WriteCloser, stderr io.WriteCloser) { + return c.stdout, c.stderr +} + +type nopCloser struct { + io.Writer +} + +func (nopCloser) Close() error { return nil } + +// Stdout returns a writer for the configured file descriptor +func (c *ExecCommand) Stdout() (io.WriteCloser, error) { + if c.stdout == nil { + if c.StdoutPath != "" { + f, err := fifo.Open(c.StdoutPath) + if err != nil { + return nil, fmt.Errorf("failed to create stdout: %v", err) + } + c.stdout = f + } else { + c.stdout = nopCloser{ioutil.Discard} + } + } + return c.stdout, nil +} + +// Stderr returns a writer for the configured file descriptor +func (c *ExecCommand) Stderr() (io.WriteCloser, error) { + if c.stderr == nil { + if c.StderrPath != "" { + f, err := fifo.Open(c.StderrPath) + if err != nil { + return nil, fmt.Errorf("failed to create stderr: %v", err) + } + c.stderr = f + } else { + c.stderr = nopCloser{ioutil.Discard} + } + } + return c.stderr, nil +} + +func (c *ExecCommand) Close() { + stdout, err := c.Stdout() + if err == nil { + stdout.Close() + } + stderr, err := c.Stderr() + if err == nil { + stderr.Close() + } +} + +// ProcessState holds information about the state of a user process. +type ProcessState struct { + Pid int + ExitCode int + Signal int + Time time.Time +} + +// ExecutorVersion is the version of the executor +type ExecutorVersion struct { + Version string +} + +func (v *ExecutorVersion) GoString() string { + return v.Version +} diff --git a/plugins/drivers/client.go b/plugins/drivers/client.go index 83200c160..d8a1415fb 100644 --- a/plugins/drivers/client.go +++ b/plugins/drivers/client.go @@ -265,7 +265,7 @@ func (d *driverPluginClient) TaskStats(taskID string) (*cstructs.TaskResourceUsa return nil, err } - stats, err := taskStatsFromProto(resp.Stats) + stats, err := TaskStatsFromProto(resp.Stats) if err != nil { return nil, err } diff --git a/plugins/drivers/server.go b/plugins/drivers/server.go index deff26e12..f24072a3c 100644 --- a/plugins/drivers/server.go +++ b/plugins/drivers/server.go @@ -230,7 +230,7 @@ func (b *driverPluginServer) TaskStats(ctx context.Context, req *proto.TaskStats return nil, err } - pb, err := taskStatsToProto(stats) + pb, err := TaskStatsToProto(stats) if err != nil { return nil, fmt.Errorf("failed to encode task stats: %v", err) } diff --git a/plugins/drivers/utils.go b/plugins/drivers/utils.go index 090ff014a..d8ea6f605 100644 --- a/plugins/drivers/utils.go +++ b/plugins/drivers/utils.go @@ -373,7 +373,7 @@ func taskStatusFromProto(pb *proto.TaskStatus) (*TaskStatus, error) { }, nil } -func taskStatsToProto(stats *cstructs.TaskResourceUsage) (*proto.TaskStats, error) { +func TaskStatsToProto(stats *cstructs.TaskResourceUsage) (*proto.TaskStats, error) { timestamp, err := ptypes.TimestampProto(time.Unix(0, stats.Timestamp)) if err != nil { return nil, err @@ -391,7 +391,7 @@ func taskStatsToProto(stats *cstructs.TaskResourceUsage) (*proto.TaskStats, erro }, nil } -func taskStatsFromProto(pb *proto.TaskStats) (*cstructs.TaskResourceUsage, error) { +func TaskStatsFromProto(pb *proto.TaskStats) (*cstructs.TaskResourceUsage, error) { timestamp, err := ptypes.Timestamp(pb.Timestamp) if err != nil { return nil, err diff --git a/plugins/drivers/utils/utils.go b/plugins/drivers/utils/utils.go index c11fd2a8b..1060dfce2 100644 --- a/plugins/drivers/utils/utils.go +++ b/plugins/drivers/utils/utils.go @@ -14,7 +14,7 @@ import ( "github.com/hashicorp/nomad/client/config" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/taskenv" - "github.com/hashicorp/nomad/drivers/shared/executor" + estructs "github.com/hashicorp/nomad/drivers/shared/executor/structs" "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/base" @@ -64,7 +64,7 @@ func CgroupsMounted(node *structs.Node) bool { // CreateExecutor launches an executor plugin and returns an instance of the // Executor interface func CreateExecutor(w io.Writer, level hclog.Level, driverConfig *base.ClientDriverConfig, - executorConfig *pexecutor.ExecutorConfig) (executor.Executor, *plugin.Client, error) { + executorConfig *pexecutor.ExecutorConfig) (estructs.Executor, *plugin.Client, error) { c, err := json.Marshal(executorConfig) if err != nil { @@ -80,6 +80,7 @@ func CreateExecutor(w io.Writer, level hclog.Level, driverConfig *base.ClientDri } config.HandshakeConfig = base.Handshake config.Plugins = pexecutor.GetPluginMap(w, level, executorConfig.FSIsolation) + config.AllowedProtocols = []plugin.Protocol{plugin.ProtocolGRPC} if driverConfig != nil { config.MaxPort = driverConfig.ClientMaxPort @@ -105,17 +106,20 @@ func CreateExecutor(w io.Writer, level hclog.Level, driverConfig *base.ClientDri if err != nil { return nil, nil, fmt.Errorf("unable to dispense the executor plugin: %v", err) } - executorPlugin := raw.(executor.Executor) + executorPlugin := raw.(estructs.Executor) return executorPlugin, executorClient, nil } // CreateExecutorWithConfig launches a plugin with a given plugin config -func CreateExecutorWithConfig(config *plugin.ClientConfig, w io.Writer) (executor.Executor, *plugin.Client, error) { +func CreateExecutorWithConfig(config *plugin.ClientConfig, w io.Writer) (estructs.Executor, *plugin.Client, error) { config.HandshakeConfig = base.Handshake // Setting this to DEBUG since the log level at the executor server process // is already set, and this effects only the executor client. + // TODO: Use versioned plugin map to support backwards compatability with + // existing pre-0.9 executors config.Plugins = pexecutor.GetPluginMap(w, hclog.Debug, false) + config.AllowedProtocols = []plugin.Protocol{plugin.ProtocolGRPC} executorClient := plugin.NewClient(config) rpcClient, err := executorClient.Client() @@ -127,7 +131,7 @@ func CreateExecutorWithConfig(config *plugin.ClientConfig, w io.Writer) (executo if err != nil { return nil, nil, fmt.Errorf("unable to dispense the executor plugin: %v", err) } - executorPlugin, ok := raw.(*pexecutor.ExecutorRPC) + executorPlugin, ok := raw.(estructs.Executor) if !ok { return nil, nil, fmt.Errorf("unexpected executor rpc type: %T", raw) } diff --git a/plugins/executor/client.go b/plugins/executor/client.go new file mode 100644 index 000000000..08b429d95 --- /dev/null +++ b/plugins/executor/client.go @@ -0,0 +1,145 @@ +package executor + +import ( + "context" + "os" + "time" + + "github.com/LK4D4/joincontext" + "github.com/golang/protobuf/ptypes" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/drivers/shared/executor/structs" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/hashicorp/nomad/plugins/executor/proto" +) + +var _ structs.Executor = (*grpcExecutorClient)(nil) + +type grpcExecutorClient struct { + client proto.ExecutorClient + + // doneCtx is close when the plugin exits + doneCtx context.Context +} + +func (c *grpcExecutorClient) Launch(cmd *structs.ExecCommand) (*structs.ProcessState, error) { + ctx := context.Background() + req := &proto.LaunchRequest{ + Cmd: cmd.Cmd, + Args: cmd.Args, + Resources: resourcesToProto(cmd.Resources), + StdoutPath: cmd.StdoutPath, + StderrPath: cmd.StderrPath, + Env: cmd.Env, + User: cmd.User, + TaskDir: cmd.TaskDir, + ResourceLimits: cmd.ResourceLimits, + BasicProcessCgroup: cmd.BasicProcessCgroup, + } + resp, err := c.client.Launch(ctx, req) + if err != nil { + return nil, err + } + + ps, err := processStateFromProto(resp.Process) + if err != nil { + return nil, err + } + return ps, nil +} + +func (c *grpcExecutorClient) Wait(ctx context.Context) (*structs.ProcessState, error) { + // Join the passed context and the shutdown context + ctx, _ = joincontext.Join(ctx, c.doneCtx) + + resp, err := c.client.Wait(ctx, &proto.WaitRequest{}) + if err != nil { + return nil, err + } + + ps, err := processStateFromProto(resp.Process) + if err != nil { + return nil, err + } + + return ps, nil +} + +func (c *grpcExecutorClient) Shutdown(signal string, gracePeriod time.Duration) error { + ctx := context.Background() + req := &proto.ShutdownRequest{ + Signal: signal, + GracePeriod: gracePeriod.Nanoseconds(), + } + if _, err := c.client.Shutdown(ctx, req); err != nil { + return err + } + + return nil +} + +func (c *grpcExecutorClient) UpdateResources(r *structs.Resources) error { + ctx := context.Background() + req := &proto.UpdateResourcesRequest{Resources: resourcesToProto(r)} + if _, err := c.client.UpdateResources(ctx, req); err != nil { + return err + } + + return nil +} + +func (c *grpcExecutorClient) Version() (*structs.ExecutorVersion, error) { + ctx := context.Background() + resp, err := c.client.Version(ctx, &proto.VersionRequest{}) + if err != nil { + return nil, err + } + return &structs.ExecutorVersion{Version: resp.Version}, nil +} + +func (c *grpcExecutorClient) Stats() (*cstructs.TaskResourceUsage, error) { + ctx := context.Background() + resp, err := c.client.Stats(ctx, &proto.StatsRequest{}) + if err != nil { + return nil, err + } + + stats, err := drivers.TaskStatsFromProto(resp.Stats) + if err != nil { + return nil, err + } + return stats, nil + +} + +func (c *grpcExecutorClient) Signal(s os.Signal) error { + ctx := context.Background() + req := &proto.SignalRequest{ + Signal: s.String(), + } + if _, err := c.client.Signal(ctx, req); err != nil { + return err + } + + return nil +} + +func (c *grpcExecutorClient) Exec(deadline time.Time, cmd string, args []string) ([]byte, int, error) { + ctx := context.Background() + pbDeadline, err := ptypes.TimestampProto(deadline) + if err != nil { + return nil, 0, err + } + req := &proto.ExecRequest{ + Deadline: pbDeadline, + Cmd: cmd, + Args: args, + } + + resp, err := c.client.Exec(ctx, req) + if err != nil { + return nil, 0, err + } + + return resp.Output, int(resp.ExitCode), nil +} diff --git a/plugins/executor/executor_plugin.go b/plugins/executor/executor_plugin.go index a31d9eb80..905dc6a30 100644 --- a/plugins/executor/executor_plugin.go +++ b/plugins/executor/executor_plugin.go @@ -1,185 +1,34 @@ package executor import ( - "encoding/gob" - "net/rpc" - "os" - "syscall" - "time" + "context" hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" - cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/drivers/shared/executor" + "github.com/hashicorp/nomad/plugins/executor/proto" + "google.golang.org/grpc" ) -// Registering these types since we have to serialize and de-serialize the Task -// structs over the wire between drivers and the executor. -func init() { - gob.Register([]interface{}{}) - gob.Register(map[string]interface{}{}) - gob.Register([]map[string]string{}) - gob.Register([]map[string]int{}) - gob.Register(syscall.Signal(0x1)) -} - -type ExecutorRPC struct { - client *rpc.Client - logger hclog.Logger -} - -// LaunchCmdArgs wraps a user command and the args for the purposes of RPC -type LaunchArgs struct { - Cmd *executor.ExecCommand -} - -// ShutdownArgs wraps shutdown signal and grace period -type ShutdownArgs struct { - Signal string - GracePeriod time.Duration -} - -type ExecArgs struct { - Deadline time.Time - Name string - Args []string -} - -type ExecReturn struct { - Output []byte - Code int -} - -func (e *ExecutorRPC) Launch(cmd *executor.ExecCommand) (*executor.ProcessState, error) { - var ps *executor.ProcessState - err := e.client.Call("Plugin.Launch", LaunchArgs{Cmd: cmd}, &ps) - return ps, err -} - -func (e *ExecutorRPC) Wait() (*executor.ProcessState, error) { - var ps executor.ProcessState - err := e.client.Call("Plugin.Wait", new(interface{}), &ps) - return &ps, err -} - -func (e *ExecutorRPC) Kill() error { - return e.client.Call("Plugin.Kill", new(interface{}), new(interface{})) -} - -func (e *ExecutorRPC) Shutdown(signal string, grace time.Duration) error { - return e.client.Call("Plugin.Shutdown", &ShutdownArgs{signal, grace}, new(interface{})) -} - -func (e *ExecutorRPC) UpdateResources(resources *executor.Resources) error { - return e.client.Call("Plugin.UpdateResources", resources, new(interface{})) -} - -func (e *ExecutorRPC) Version() (*executor.ExecutorVersion, error) { - var version executor.ExecutorVersion - err := e.client.Call("Plugin.Version", new(interface{}), &version) - return &version, err -} - -func (e *ExecutorRPC) Stats() (*cstructs.TaskResourceUsage, error) { - var resourceUsage cstructs.TaskResourceUsage - err := e.client.Call("Plugin.Stats", new(interface{}), &resourceUsage) - return &resourceUsage, err -} - -func (e *ExecutorRPC) Signal(s os.Signal) error { - return e.client.Call("Plugin.Signal", &s, new(interface{})) -} - -func (e *ExecutorRPC) Exec(deadline time.Time, name string, args []string) ([]byte, int, error) { - req := ExecArgs{ - Deadline: deadline, - Name: name, - Args: args, - } - var resp *ExecReturn - err := e.client.Call("Plugin.Exec", req, &resp) - if resp == nil { - return nil, 0, err - } - return resp.Output, resp.Code, err -} - -type ExecutorRPCServer struct { - Impl executor.Executor - logger hclog.Logger -} - -func (e *ExecutorRPCServer) Launch(args LaunchArgs, ps *executor.ProcessState) error { - state, err := e.Impl.Launch(args.Cmd) - if state != nil { - *ps = *state - } - return err -} - -func (e *ExecutorRPCServer) Wait(args interface{}, ps *executor.ProcessState) error { - state, err := e.Impl.Wait() - if state != nil { - *ps = *state - } - return err -} - -func (e *ExecutorRPCServer) Shutdown(args ShutdownArgs, resp *interface{}) error { - return e.Impl.Shutdown(args.Signal, args.GracePeriod) -} - -func (e *ExecutorRPCServer) UpdateResources(args *executor.Resources, resp *interface{}) error { - return e.Impl.UpdateResources(args) -} - -func (e *ExecutorRPCServer) Version(args interface{}, version *executor.ExecutorVersion) error { - ver, err := e.Impl.Version() - if ver != nil { - *version = *ver - } - return err -} - -func (e *ExecutorRPCServer) Stats(args interface{}, resourceUsage *cstructs.TaskResourceUsage) error { - ru, err := e.Impl.Stats() - if ru != nil { - *resourceUsage = *ru - } - return err -} - -func (e *ExecutorRPCServer) Signal(args os.Signal, resp *interface{}) error { - return e.Impl.Signal(args) -} - -func (e *ExecutorRPCServer) Exec(args ExecArgs, result *ExecReturn) error { - out, code, err := e.Impl.Exec(args.Deadline, args.Name, args.Args) - ret := &ExecReturn{ - Output: out, - Code: code, - } - *result = *ret - return err -} - type ExecutorPlugin struct { + // TODO: support backwards compatability with pre 0.9 NetRPC plugin + plugin.NetRPCUnsupportedPlugin logger hclog.Logger fsIsolation bool - Impl *ExecutorRPCServer } -func (p *ExecutorPlugin) Server(*plugin.MuxBroker) (interface{}, error) { - if p.Impl == nil { - if p.fsIsolation { - p.Impl = &ExecutorRPCServer{Impl: executor.NewExecutorWithIsolation(p.logger), logger: p.logger} - } else { - p.Impl = &ExecutorRPCServer{Impl: executor.NewExecutor(p.logger), logger: p.logger} - } +func (p *ExecutorPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { + if p.fsIsolation { + proto.RegisterExecutorServer(s, &grpcExecutorServer{impl: executor.NewExecutorWithIsolation(p.logger)}) + } else { + proto.RegisterExecutorServer(s, &grpcExecutorServer{impl: executor.NewExecutor(p.logger)}) } - return p.Impl, nil + return nil } -func (p *ExecutorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) { - return &ExecutorRPC{client: c, logger: p.logger}, nil +func (p *ExecutorPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) { + return &grpcExecutorClient{ + client: proto.NewExecutorClient(c), + doneCtx: ctx, + }, nil } diff --git a/plugins/executor/proto/executor.pb.go b/plugins/executor/proto/executor.pb.go new file mode 100644 index 000000000..5c345fa66 --- /dev/null +++ b/plugins/executor/proto/executor.pb.go @@ -0,0 +1,1201 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: plugins/executor/proto/executor.proto + +package proto + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" +import timestamp "github.com/golang/protobuf/ptypes/timestamp" +import proto1 "github.com/hashicorp/nomad/plugins/drivers/proto" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type LaunchRequest struct { + Cmd string `protobuf:"bytes,1,opt,name=cmd,proto3" json:"cmd,omitempty"` + Args []string `protobuf:"bytes,2,rep,name=args,proto3" json:"args,omitempty"` + Resources *Resources `protobuf:"bytes,3,opt,name=resources,proto3" json:"resources,omitempty"` + StdoutPath string `protobuf:"bytes,4,opt,name=stdout_path,json=stdoutPath,proto3" json:"stdout_path,omitempty"` + StderrPath string `protobuf:"bytes,5,opt,name=stderr_path,json=stderrPath,proto3" json:"stderr_path,omitempty"` + Env []string `protobuf:"bytes,6,rep,name=env,proto3" json:"env,omitempty"` + User string `protobuf:"bytes,7,opt,name=user,proto3" json:"user,omitempty"` + TaskDir string `protobuf:"bytes,8,opt,name=task_dir,json=taskDir,proto3" json:"task_dir,omitempty"` + ResourceLimits bool `protobuf:"varint,9,opt,name=resource_limits,json=resourceLimits,proto3" json:"resource_limits,omitempty"` + BasicProcessCgroup bool `protobuf:"varint,10,opt,name=basic_process_cgroup,json=basicProcessCgroup,proto3" json:"basic_process_cgroup,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *LaunchRequest) Reset() { *m = LaunchRequest{} } +func (m *LaunchRequest) String() string { return proto.CompactTextString(m) } +func (*LaunchRequest) ProtoMessage() {} +func (*LaunchRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_executor_e0a843141d496249, []int{0} +} +func (m *LaunchRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_LaunchRequest.Unmarshal(m, b) +} +func (m *LaunchRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_LaunchRequest.Marshal(b, m, deterministic) +} +func (dst *LaunchRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_LaunchRequest.Merge(dst, src) +} +func (m *LaunchRequest) XXX_Size() int { + return xxx_messageInfo_LaunchRequest.Size(m) +} +func (m *LaunchRequest) XXX_DiscardUnknown() { + xxx_messageInfo_LaunchRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_LaunchRequest proto.InternalMessageInfo + +func (m *LaunchRequest) GetCmd() string { + if m != nil { + return m.Cmd + } + return "" +} + +func (m *LaunchRequest) GetArgs() []string { + if m != nil { + return m.Args + } + return nil +} + +func (m *LaunchRequest) GetResources() *Resources { + if m != nil { + return m.Resources + } + return nil +} + +func (m *LaunchRequest) GetStdoutPath() string { + if m != nil { + return m.StdoutPath + } + return "" +} + +func (m *LaunchRequest) GetStderrPath() string { + if m != nil { + return m.StderrPath + } + return "" +} + +func (m *LaunchRequest) GetEnv() []string { + if m != nil { + return m.Env + } + return nil +} + +func (m *LaunchRequest) GetUser() string { + if m != nil { + return m.User + } + return "" +} + +func (m *LaunchRequest) GetTaskDir() string { + if m != nil { + return m.TaskDir + } + return "" +} + +func (m *LaunchRequest) GetResourceLimits() bool { + if m != nil { + return m.ResourceLimits + } + return false +} + +func (m *LaunchRequest) GetBasicProcessCgroup() bool { + if m != nil { + return m.BasicProcessCgroup + } + return false +} + +type LaunchResponse struct { + Process *ProcessState `protobuf:"bytes,1,opt,name=process,proto3" json:"process,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *LaunchResponse) Reset() { *m = LaunchResponse{} } +func (m *LaunchResponse) String() string { return proto.CompactTextString(m) } +func (*LaunchResponse) ProtoMessage() {} +func (*LaunchResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_executor_e0a843141d496249, []int{1} +} +func (m *LaunchResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_LaunchResponse.Unmarshal(m, b) +} +func (m *LaunchResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_LaunchResponse.Marshal(b, m, deterministic) +} +func (dst *LaunchResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_LaunchResponse.Merge(dst, src) +} +func (m *LaunchResponse) XXX_Size() int { + return xxx_messageInfo_LaunchResponse.Size(m) +} +func (m *LaunchResponse) XXX_DiscardUnknown() { + xxx_messageInfo_LaunchResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_LaunchResponse proto.InternalMessageInfo + +func (m *LaunchResponse) GetProcess() *ProcessState { + if m != nil { + return m.Process + } + return nil +} + +type WaitRequest struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *WaitRequest) Reset() { *m = WaitRequest{} } +func (m *WaitRequest) String() string { return proto.CompactTextString(m) } +func (*WaitRequest) ProtoMessage() {} +func (*WaitRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_executor_e0a843141d496249, []int{2} +} +func (m *WaitRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_WaitRequest.Unmarshal(m, b) +} +func (m *WaitRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_WaitRequest.Marshal(b, m, deterministic) +} +func (dst *WaitRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_WaitRequest.Merge(dst, src) +} +func (m *WaitRequest) XXX_Size() int { + return xxx_messageInfo_WaitRequest.Size(m) +} +func (m *WaitRequest) XXX_DiscardUnknown() { + xxx_messageInfo_WaitRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_WaitRequest proto.InternalMessageInfo + +type WaitResponse struct { + Process *ProcessState `protobuf:"bytes,1,opt,name=process,proto3" json:"process,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *WaitResponse) Reset() { *m = WaitResponse{} } +func (m *WaitResponse) String() string { return proto.CompactTextString(m) } +func (*WaitResponse) ProtoMessage() {} +func (*WaitResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_executor_e0a843141d496249, []int{3} +} +func (m *WaitResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_WaitResponse.Unmarshal(m, b) +} +func (m *WaitResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_WaitResponse.Marshal(b, m, deterministic) +} +func (dst *WaitResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_WaitResponse.Merge(dst, src) +} +func (m *WaitResponse) XXX_Size() int { + return xxx_messageInfo_WaitResponse.Size(m) +} +func (m *WaitResponse) XXX_DiscardUnknown() { + xxx_messageInfo_WaitResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_WaitResponse proto.InternalMessageInfo + +func (m *WaitResponse) GetProcess() *ProcessState { + if m != nil { + return m.Process + } + return nil +} + +type ShutdownRequest struct { + Signal string `protobuf:"bytes,1,opt,name=signal,proto3" json:"signal,omitempty"` + GracePeriod int64 `protobuf:"varint,2,opt,name=grace_period,json=gracePeriod,proto3" json:"grace_period,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ShutdownRequest) Reset() { *m = ShutdownRequest{} } +func (m *ShutdownRequest) String() string { return proto.CompactTextString(m) } +func (*ShutdownRequest) ProtoMessage() {} +func (*ShutdownRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_executor_e0a843141d496249, []int{4} +} +func (m *ShutdownRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ShutdownRequest.Unmarshal(m, b) +} +func (m *ShutdownRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ShutdownRequest.Marshal(b, m, deterministic) +} +func (dst *ShutdownRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ShutdownRequest.Merge(dst, src) +} +func (m *ShutdownRequest) XXX_Size() int { + return xxx_messageInfo_ShutdownRequest.Size(m) +} +func (m *ShutdownRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ShutdownRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ShutdownRequest proto.InternalMessageInfo + +func (m *ShutdownRequest) GetSignal() string { + if m != nil { + return m.Signal + } + return "" +} + +func (m *ShutdownRequest) GetGracePeriod() int64 { + if m != nil { + return m.GracePeriod + } + return 0 +} + +type ShutdownResponse struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ShutdownResponse) Reset() { *m = ShutdownResponse{} } +func (m *ShutdownResponse) String() string { return proto.CompactTextString(m) } +func (*ShutdownResponse) ProtoMessage() {} +func (*ShutdownResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_executor_e0a843141d496249, []int{5} +} +func (m *ShutdownResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ShutdownResponse.Unmarshal(m, b) +} +func (m *ShutdownResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ShutdownResponse.Marshal(b, m, deterministic) +} +func (dst *ShutdownResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ShutdownResponse.Merge(dst, src) +} +func (m *ShutdownResponse) XXX_Size() int { + return xxx_messageInfo_ShutdownResponse.Size(m) +} +func (m *ShutdownResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ShutdownResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_ShutdownResponse proto.InternalMessageInfo + +type UpdateResourcesRequest struct { + Resources *Resources `protobuf:"bytes,1,opt,name=resources,proto3" json:"resources,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *UpdateResourcesRequest) Reset() { *m = UpdateResourcesRequest{} } +func (m *UpdateResourcesRequest) String() string { return proto.CompactTextString(m) } +func (*UpdateResourcesRequest) ProtoMessage() {} +func (*UpdateResourcesRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_executor_e0a843141d496249, []int{6} +} +func (m *UpdateResourcesRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_UpdateResourcesRequest.Unmarshal(m, b) +} +func (m *UpdateResourcesRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_UpdateResourcesRequest.Marshal(b, m, deterministic) +} +func (dst *UpdateResourcesRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_UpdateResourcesRequest.Merge(dst, src) +} +func (m *UpdateResourcesRequest) XXX_Size() int { + return xxx_messageInfo_UpdateResourcesRequest.Size(m) +} +func (m *UpdateResourcesRequest) XXX_DiscardUnknown() { + xxx_messageInfo_UpdateResourcesRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_UpdateResourcesRequest proto.InternalMessageInfo + +func (m *UpdateResourcesRequest) GetResources() *Resources { + if m != nil { + return m.Resources + } + return nil +} + +type UpdateResourcesResponse struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *UpdateResourcesResponse) Reset() { *m = UpdateResourcesResponse{} } +func (m *UpdateResourcesResponse) String() string { return proto.CompactTextString(m) } +func (*UpdateResourcesResponse) ProtoMessage() {} +func (*UpdateResourcesResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_executor_e0a843141d496249, []int{7} +} +func (m *UpdateResourcesResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_UpdateResourcesResponse.Unmarshal(m, b) +} +func (m *UpdateResourcesResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_UpdateResourcesResponse.Marshal(b, m, deterministic) +} +func (dst *UpdateResourcesResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_UpdateResourcesResponse.Merge(dst, src) +} +func (m *UpdateResourcesResponse) XXX_Size() int { + return xxx_messageInfo_UpdateResourcesResponse.Size(m) +} +func (m *UpdateResourcesResponse) XXX_DiscardUnknown() { + xxx_messageInfo_UpdateResourcesResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_UpdateResourcesResponse proto.InternalMessageInfo + +type VersionRequest struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *VersionRequest) Reset() { *m = VersionRequest{} } +func (m *VersionRequest) String() string { return proto.CompactTextString(m) } +func (*VersionRequest) ProtoMessage() {} +func (*VersionRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_executor_e0a843141d496249, []int{8} +} +func (m *VersionRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_VersionRequest.Unmarshal(m, b) +} +func (m *VersionRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_VersionRequest.Marshal(b, m, deterministic) +} +func (dst *VersionRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_VersionRequest.Merge(dst, src) +} +func (m *VersionRequest) XXX_Size() int { + return xxx_messageInfo_VersionRequest.Size(m) +} +func (m *VersionRequest) XXX_DiscardUnknown() { + xxx_messageInfo_VersionRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_VersionRequest proto.InternalMessageInfo + +type VersionResponse struct { + Version string `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *VersionResponse) Reset() { *m = VersionResponse{} } +func (m *VersionResponse) String() string { return proto.CompactTextString(m) } +func (*VersionResponse) ProtoMessage() {} +func (*VersionResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_executor_e0a843141d496249, []int{9} +} +func (m *VersionResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_VersionResponse.Unmarshal(m, b) +} +func (m *VersionResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_VersionResponse.Marshal(b, m, deterministic) +} +func (dst *VersionResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_VersionResponse.Merge(dst, src) +} +func (m *VersionResponse) XXX_Size() int { + return xxx_messageInfo_VersionResponse.Size(m) +} +func (m *VersionResponse) XXX_DiscardUnknown() { + xxx_messageInfo_VersionResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_VersionResponse proto.InternalMessageInfo + +func (m *VersionResponse) GetVersion() string { + if m != nil { + return m.Version + } + return "" +} + +type StatsRequest struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StatsRequest) Reset() { *m = StatsRequest{} } +func (m *StatsRequest) String() string { return proto.CompactTextString(m) } +func (*StatsRequest) ProtoMessage() {} +func (*StatsRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_executor_e0a843141d496249, []int{10} +} +func (m *StatsRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_StatsRequest.Unmarshal(m, b) +} +func (m *StatsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_StatsRequest.Marshal(b, m, deterministic) +} +func (dst *StatsRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_StatsRequest.Merge(dst, src) +} +func (m *StatsRequest) XXX_Size() int { + return xxx_messageInfo_StatsRequest.Size(m) +} +func (m *StatsRequest) XXX_DiscardUnknown() { + xxx_messageInfo_StatsRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_StatsRequest proto.InternalMessageInfo + +type StatsResponse struct { + Stats *proto1.TaskStats `protobuf:"bytes,1,opt,name=stats,proto3" json:"stats,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StatsResponse) Reset() { *m = StatsResponse{} } +func (m *StatsResponse) String() string { return proto.CompactTextString(m) } +func (*StatsResponse) ProtoMessage() {} +func (*StatsResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_executor_e0a843141d496249, []int{11} +} +func (m *StatsResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_StatsResponse.Unmarshal(m, b) +} +func (m *StatsResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_StatsResponse.Marshal(b, m, deterministic) +} +func (dst *StatsResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_StatsResponse.Merge(dst, src) +} +func (m *StatsResponse) XXX_Size() int { + return xxx_messageInfo_StatsResponse.Size(m) +} +func (m *StatsResponse) XXX_DiscardUnknown() { + xxx_messageInfo_StatsResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_StatsResponse proto.InternalMessageInfo + +func (m *StatsResponse) GetStats() *proto1.TaskStats { + if m != nil { + return m.Stats + } + return nil +} + +type SignalRequest struct { + Signal string `protobuf:"bytes,1,opt,name=signal,proto3" json:"signal,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SignalRequest) Reset() { *m = SignalRequest{} } +func (m *SignalRequest) String() string { return proto.CompactTextString(m) } +func (*SignalRequest) ProtoMessage() {} +func (*SignalRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_executor_e0a843141d496249, []int{12} +} +func (m *SignalRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SignalRequest.Unmarshal(m, b) +} +func (m *SignalRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SignalRequest.Marshal(b, m, deterministic) +} +func (dst *SignalRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_SignalRequest.Merge(dst, src) +} +func (m *SignalRequest) XXX_Size() int { + return xxx_messageInfo_SignalRequest.Size(m) +} +func (m *SignalRequest) XXX_DiscardUnknown() { + xxx_messageInfo_SignalRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_SignalRequest proto.InternalMessageInfo + +func (m *SignalRequest) GetSignal() string { + if m != nil { + return m.Signal + } + return "" +} + +type SignalResponse struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SignalResponse) Reset() { *m = SignalResponse{} } +func (m *SignalResponse) String() string { return proto.CompactTextString(m) } +func (*SignalResponse) ProtoMessage() {} +func (*SignalResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_executor_e0a843141d496249, []int{13} +} +func (m *SignalResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SignalResponse.Unmarshal(m, b) +} +func (m *SignalResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SignalResponse.Marshal(b, m, deterministic) +} +func (dst *SignalResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_SignalResponse.Merge(dst, src) +} +func (m *SignalResponse) XXX_Size() int { + return xxx_messageInfo_SignalResponse.Size(m) +} +func (m *SignalResponse) XXX_DiscardUnknown() { + xxx_messageInfo_SignalResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_SignalResponse proto.InternalMessageInfo + +type ExecRequest struct { + Deadline *timestamp.Timestamp `protobuf:"bytes,1,opt,name=deadline,proto3" json:"deadline,omitempty"` + Cmd string `protobuf:"bytes,2,opt,name=cmd,proto3" json:"cmd,omitempty"` + Args []string `protobuf:"bytes,3,rep,name=args,proto3" json:"args,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ExecRequest) Reset() { *m = ExecRequest{} } +func (m *ExecRequest) String() string { return proto.CompactTextString(m) } +func (*ExecRequest) ProtoMessage() {} +func (*ExecRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_executor_e0a843141d496249, []int{14} +} +func (m *ExecRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ExecRequest.Unmarshal(m, b) +} +func (m *ExecRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ExecRequest.Marshal(b, m, deterministic) +} +func (dst *ExecRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ExecRequest.Merge(dst, src) +} +func (m *ExecRequest) XXX_Size() int { + return xxx_messageInfo_ExecRequest.Size(m) +} +func (m *ExecRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ExecRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ExecRequest proto.InternalMessageInfo + +func (m *ExecRequest) GetDeadline() *timestamp.Timestamp { + if m != nil { + return m.Deadline + } + return nil +} + +func (m *ExecRequest) GetCmd() string { + if m != nil { + return m.Cmd + } + return "" +} + +func (m *ExecRequest) GetArgs() []string { + if m != nil { + return m.Args + } + return nil +} + +type ExecResponse struct { + Output []byte `protobuf:"bytes,1,opt,name=output,proto3" json:"output,omitempty"` + ExitCode int32 `protobuf:"varint,2,opt,name=exit_code,json=exitCode,proto3" json:"exit_code,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ExecResponse) Reset() { *m = ExecResponse{} } +func (m *ExecResponse) String() string { return proto.CompactTextString(m) } +func (*ExecResponse) ProtoMessage() {} +func (*ExecResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_executor_e0a843141d496249, []int{15} +} +func (m *ExecResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ExecResponse.Unmarshal(m, b) +} +func (m *ExecResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ExecResponse.Marshal(b, m, deterministic) +} +func (dst *ExecResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ExecResponse.Merge(dst, src) +} +func (m *ExecResponse) XXX_Size() int { + return xxx_messageInfo_ExecResponse.Size(m) +} +func (m *ExecResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ExecResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_ExecResponse proto.InternalMessageInfo + +func (m *ExecResponse) GetOutput() []byte { + if m != nil { + return m.Output + } + return nil +} + +func (m *ExecResponse) GetExitCode() int32 { + if m != nil { + return m.ExitCode + } + return 0 +} + +type Resources struct { + Cpu int32 `protobuf:"varint,1,opt,name=cpu,proto3" json:"cpu,omitempty"` + MemoryMB int32 `protobuf:"varint,2,opt,name=memoryMB,proto3" json:"memoryMB,omitempty"` + DiskMB int32 `protobuf:"varint,3,opt,name=diskMB,proto3" json:"diskMB,omitempty"` + Iops int32 `protobuf:"varint,4,opt,name=iops,proto3" json:"iops,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Resources) Reset() { *m = Resources{} } +func (m *Resources) String() string { return proto.CompactTextString(m) } +func (*Resources) ProtoMessage() {} +func (*Resources) Descriptor() ([]byte, []int) { + return fileDescriptor_executor_e0a843141d496249, []int{16} +} +func (m *Resources) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Resources.Unmarshal(m, b) +} +func (m *Resources) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Resources.Marshal(b, m, deterministic) +} +func (dst *Resources) XXX_Merge(src proto.Message) { + xxx_messageInfo_Resources.Merge(dst, src) +} +func (m *Resources) XXX_Size() int { + return xxx_messageInfo_Resources.Size(m) +} +func (m *Resources) XXX_DiscardUnknown() { + xxx_messageInfo_Resources.DiscardUnknown(m) +} + +var xxx_messageInfo_Resources proto.InternalMessageInfo + +func (m *Resources) GetCpu() int32 { + if m != nil { + return m.Cpu + } + return 0 +} + +func (m *Resources) GetMemoryMB() int32 { + if m != nil { + return m.MemoryMB + } + return 0 +} + +func (m *Resources) GetDiskMB() int32 { + if m != nil { + return m.DiskMB + } + return 0 +} + +func (m *Resources) GetIops() int32 { + if m != nil { + return m.Iops + } + return 0 +} + +type ProcessState struct { + Pid int32 `protobuf:"varint,1,opt,name=pid,proto3" json:"pid,omitempty"` + ExitCode int32 `protobuf:"varint,2,opt,name=exit_code,json=exitCode,proto3" json:"exit_code,omitempty"` + Signal int32 `protobuf:"varint,3,opt,name=signal,proto3" json:"signal,omitempty"` + Time *timestamp.Timestamp `protobuf:"bytes,4,opt,name=time,proto3" json:"time,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ProcessState) Reset() { *m = ProcessState{} } +func (m *ProcessState) String() string { return proto.CompactTextString(m) } +func (*ProcessState) ProtoMessage() {} +func (*ProcessState) Descriptor() ([]byte, []int) { + return fileDescriptor_executor_e0a843141d496249, []int{17} +} +func (m *ProcessState) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ProcessState.Unmarshal(m, b) +} +func (m *ProcessState) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ProcessState.Marshal(b, m, deterministic) +} +func (dst *ProcessState) XXX_Merge(src proto.Message) { + xxx_messageInfo_ProcessState.Merge(dst, src) +} +func (m *ProcessState) XXX_Size() int { + return xxx_messageInfo_ProcessState.Size(m) +} +func (m *ProcessState) XXX_DiscardUnknown() { + xxx_messageInfo_ProcessState.DiscardUnknown(m) +} + +var xxx_messageInfo_ProcessState proto.InternalMessageInfo + +func (m *ProcessState) GetPid() int32 { + if m != nil { + return m.Pid + } + return 0 +} + +func (m *ProcessState) GetExitCode() int32 { + if m != nil { + return m.ExitCode + } + return 0 +} + +func (m *ProcessState) GetSignal() int32 { + if m != nil { + return m.Signal + } + return 0 +} + +func (m *ProcessState) GetTime() *timestamp.Timestamp { + if m != nil { + return m.Time + } + return nil +} + +func init() { + proto.RegisterType((*LaunchRequest)(nil), "hashicorp.nomad.plugins.executor.proto.LaunchRequest") + proto.RegisterType((*LaunchResponse)(nil), "hashicorp.nomad.plugins.executor.proto.LaunchResponse") + proto.RegisterType((*WaitRequest)(nil), "hashicorp.nomad.plugins.executor.proto.WaitRequest") + proto.RegisterType((*WaitResponse)(nil), "hashicorp.nomad.plugins.executor.proto.WaitResponse") + proto.RegisterType((*ShutdownRequest)(nil), "hashicorp.nomad.plugins.executor.proto.ShutdownRequest") + proto.RegisterType((*ShutdownResponse)(nil), "hashicorp.nomad.plugins.executor.proto.ShutdownResponse") + proto.RegisterType((*UpdateResourcesRequest)(nil), "hashicorp.nomad.plugins.executor.proto.UpdateResourcesRequest") + proto.RegisterType((*UpdateResourcesResponse)(nil), "hashicorp.nomad.plugins.executor.proto.UpdateResourcesResponse") + proto.RegisterType((*VersionRequest)(nil), "hashicorp.nomad.plugins.executor.proto.VersionRequest") + proto.RegisterType((*VersionResponse)(nil), "hashicorp.nomad.plugins.executor.proto.VersionResponse") + proto.RegisterType((*StatsRequest)(nil), "hashicorp.nomad.plugins.executor.proto.StatsRequest") + proto.RegisterType((*StatsResponse)(nil), "hashicorp.nomad.plugins.executor.proto.StatsResponse") + proto.RegisterType((*SignalRequest)(nil), "hashicorp.nomad.plugins.executor.proto.SignalRequest") + proto.RegisterType((*SignalResponse)(nil), "hashicorp.nomad.plugins.executor.proto.SignalResponse") + proto.RegisterType((*ExecRequest)(nil), "hashicorp.nomad.plugins.executor.proto.ExecRequest") + proto.RegisterType((*ExecResponse)(nil), "hashicorp.nomad.plugins.executor.proto.ExecResponse") + proto.RegisterType((*Resources)(nil), "hashicorp.nomad.plugins.executor.proto.Resources") + proto.RegisterType((*ProcessState)(nil), "hashicorp.nomad.plugins.executor.proto.ProcessState") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// ExecutorClient is the client API for Executor service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type ExecutorClient interface { + Launch(ctx context.Context, in *LaunchRequest, opts ...grpc.CallOption) (*LaunchResponse, error) + Wait(ctx context.Context, in *WaitRequest, opts ...grpc.CallOption) (*WaitResponse, error) + Shutdown(ctx context.Context, in *ShutdownRequest, opts ...grpc.CallOption) (*ShutdownResponse, error) + UpdateResources(ctx context.Context, in *UpdateResourcesRequest, opts ...grpc.CallOption) (*UpdateResourcesResponse, error) + Version(ctx context.Context, in *VersionRequest, opts ...grpc.CallOption) (*VersionResponse, error) + Stats(ctx context.Context, in *StatsRequest, opts ...grpc.CallOption) (*StatsResponse, error) + Signal(ctx context.Context, in *SignalRequest, opts ...grpc.CallOption) (*SignalResponse, error) + Exec(ctx context.Context, in *ExecRequest, opts ...grpc.CallOption) (*ExecResponse, error) +} + +type executorClient struct { + cc *grpc.ClientConn +} + +func NewExecutorClient(cc *grpc.ClientConn) ExecutorClient { + return &executorClient{cc} +} + +func (c *executorClient) Launch(ctx context.Context, in *LaunchRequest, opts ...grpc.CallOption) (*LaunchResponse, error) { + out := new(LaunchResponse) + err := c.cc.Invoke(ctx, "/hashicorp.nomad.plugins.executor.proto.Executor/Launch", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *executorClient) Wait(ctx context.Context, in *WaitRequest, opts ...grpc.CallOption) (*WaitResponse, error) { + out := new(WaitResponse) + err := c.cc.Invoke(ctx, "/hashicorp.nomad.plugins.executor.proto.Executor/Wait", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *executorClient) Shutdown(ctx context.Context, in *ShutdownRequest, opts ...grpc.CallOption) (*ShutdownResponse, error) { + out := new(ShutdownResponse) + err := c.cc.Invoke(ctx, "/hashicorp.nomad.plugins.executor.proto.Executor/Shutdown", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *executorClient) UpdateResources(ctx context.Context, in *UpdateResourcesRequest, opts ...grpc.CallOption) (*UpdateResourcesResponse, error) { + out := new(UpdateResourcesResponse) + err := c.cc.Invoke(ctx, "/hashicorp.nomad.plugins.executor.proto.Executor/UpdateResources", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *executorClient) Version(ctx context.Context, in *VersionRequest, opts ...grpc.CallOption) (*VersionResponse, error) { + out := new(VersionResponse) + err := c.cc.Invoke(ctx, "/hashicorp.nomad.plugins.executor.proto.Executor/Version", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *executorClient) Stats(ctx context.Context, in *StatsRequest, opts ...grpc.CallOption) (*StatsResponse, error) { + out := new(StatsResponse) + err := c.cc.Invoke(ctx, "/hashicorp.nomad.plugins.executor.proto.Executor/Stats", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *executorClient) Signal(ctx context.Context, in *SignalRequest, opts ...grpc.CallOption) (*SignalResponse, error) { + out := new(SignalResponse) + err := c.cc.Invoke(ctx, "/hashicorp.nomad.plugins.executor.proto.Executor/Signal", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *executorClient) Exec(ctx context.Context, in *ExecRequest, opts ...grpc.CallOption) (*ExecResponse, error) { + out := new(ExecResponse) + err := c.cc.Invoke(ctx, "/hashicorp.nomad.plugins.executor.proto.Executor/Exec", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// ExecutorServer is the server API for Executor service. +type ExecutorServer interface { + Launch(context.Context, *LaunchRequest) (*LaunchResponse, error) + Wait(context.Context, *WaitRequest) (*WaitResponse, error) + Shutdown(context.Context, *ShutdownRequest) (*ShutdownResponse, error) + UpdateResources(context.Context, *UpdateResourcesRequest) (*UpdateResourcesResponse, error) + Version(context.Context, *VersionRequest) (*VersionResponse, error) + Stats(context.Context, *StatsRequest) (*StatsResponse, error) + Signal(context.Context, *SignalRequest) (*SignalResponse, error) + Exec(context.Context, *ExecRequest) (*ExecResponse, error) +} + +func RegisterExecutorServer(s *grpc.Server, srv ExecutorServer) { + s.RegisterService(&_Executor_serviceDesc, srv) +} + +func _Executor_Launch_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(LaunchRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ExecutorServer).Launch(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/hashicorp.nomad.plugins.executor.proto.Executor/Launch", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ExecutorServer).Launch(ctx, req.(*LaunchRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Executor_Wait_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(WaitRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ExecutorServer).Wait(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/hashicorp.nomad.plugins.executor.proto.Executor/Wait", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ExecutorServer).Wait(ctx, req.(*WaitRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Executor_Shutdown_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ShutdownRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ExecutorServer).Shutdown(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/hashicorp.nomad.plugins.executor.proto.Executor/Shutdown", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ExecutorServer).Shutdown(ctx, req.(*ShutdownRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Executor_UpdateResources_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(UpdateResourcesRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ExecutorServer).UpdateResources(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/hashicorp.nomad.plugins.executor.proto.Executor/UpdateResources", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ExecutorServer).UpdateResources(ctx, req.(*UpdateResourcesRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Executor_Version_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(VersionRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ExecutorServer).Version(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/hashicorp.nomad.plugins.executor.proto.Executor/Version", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ExecutorServer).Version(ctx, req.(*VersionRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Executor_Stats_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StatsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ExecutorServer).Stats(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/hashicorp.nomad.plugins.executor.proto.Executor/Stats", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ExecutorServer).Stats(ctx, req.(*StatsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Executor_Signal_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(SignalRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ExecutorServer).Signal(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/hashicorp.nomad.plugins.executor.proto.Executor/Signal", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ExecutorServer).Signal(ctx, req.(*SignalRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Executor_Exec_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ExecRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ExecutorServer).Exec(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/hashicorp.nomad.plugins.executor.proto.Executor/Exec", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ExecutorServer).Exec(ctx, req.(*ExecRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Executor_serviceDesc = grpc.ServiceDesc{ + ServiceName: "hashicorp.nomad.plugins.executor.proto.Executor", + HandlerType: (*ExecutorServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Launch", + Handler: _Executor_Launch_Handler, + }, + { + MethodName: "Wait", + Handler: _Executor_Wait_Handler, + }, + { + MethodName: "Shutdown", + Handler: _Executor_Shutdown_Handler, + }, + { + MethodName: "UpdateResources", + Handler: _Executor_UpdateResources_Handler, + }, + { + MethodName: "Version", + Handler: _Executor_Version_Handler, + }, + { + MethodName: "Stats", + Handler: _Executor_Stats_Handler, + }, + { + MethodName: "Signal", + Handler: _Executor_Signal_Handler, + }, + { + MethodName: "Exec", + Handler: _Executor_Exec_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "plugins/executor/proto/executor.proto", +} + +func init() { + proto.RegisterFile("plugins/executor/proto/executor.proto", fileDescriptor_executor_e0a843141d496249) +} + +var fileDescriptor_executor_e0a843141d496249 = []byte{ + // 869 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x54, 0x5d, 0x6f, 0xe3, 0x44, + 0x14, 0x5d, 0x37, 0x75, 0xe2, 0xdc, 0xa4, 0x4d, 0x34, 0x42, 0xc5, 0x6b, 0x1e, 0x36, 0x58, 0x82, + 0x8d, 0x84, 0xe4, 0x2c, 0xdd, 0x2f, 0x5e, 0x00, 0xa9, 0x65, 0x79, 0xea, 0x42, 0xe5, 0x2e, 0xac, + 0xc4, 0x03, 0xc1, 0xb5, 0x07, 0x67, 0x94, 0xd8, 0x63, 0x66, 0xc6, 0xa1, 0x95, 0x90, 0x78, 0xe2, + 0x1f, 0xf0, 0x13, 0xf9, 0x11, 0x3c, 0xa2, 0xf9, 0x72, 0x93, 0x2e, 0x14, 0x07, 0xb4, 0x4f, 0x99, + 0x7b, 0x73, 0xcf, 0x3d, 0xf7, 0xce, 0x1c, 0x1f, 0xf8, 0xa0, 0x5a, 0xd5, 0x39, 0x29, 0xf9, 0x0c, + 0x5f, 0xe1, 0xb4, 0x16, 0x94, 0xcd, 0x2a, 0x46, 0x05, 0x6d, 0xc2, 0x48, 0x85, 0xe8, 0xc3, 0x45, + 0xc2, 0x17, 0x24, 0xa5, 0xac, 0x8a, 0x4a, 0x5a, 0x24, 0x59, 0x64, 0x60, 0xd1, 0x76, 0x5d, 0xf0, + 0x20, 0xa7, 0x34, 0x5f, 0x61, 0xdd, 0xe4, 0xb2, 0xfe, 0x71, 0x26, 0x48, 0x81, 0xb9, 0x48, 0x8a, + 0xca, 0x14, 0x7c, 0x9a, 0x13, 0xb1, 0xa8, 0x2f, 0xa3, 0x94, 0x16, 0xb3, 0xa6, 0xe7, 0x4c, 0xf5, + 0x9c, 0xd9, 0x51, 0x32, 0x46, 0xd6, 0x98, 0x71, 0x33, 0x89, 0x8e, 0x34, 0x3c, 0xfc, 0x63, 0x0f, + 0x0e, 0xce, 0x92, 0xba, 0x4c, 0x17, 0x31, 0xfe, 0xa9, 0xc6, 0x5c, 0xa0, 0x31, 0x74, 0xd2, 0x22, + 0xf3, 0x9d, 0x89, 0x33, 0xed, 0xc7, 0xf2, 0x88, 0x10, 0xec, 0x27, 0x2c, 0xe7, 0xfe, 0xde, 0xa4, + 0x33, 0xed, 0xc7, 0xea, 0x8c, 0xbe, 0x86, 0x3e, 0xc3, 0x9c, 0xd6, 0x2c, 0xc5, 0xdc, 0xef, 0x4c, + 0x9c, 0xe9, 0xe0, 0xf8, 0xe3, 0xa8, 0xdd, 0x4e, 0x51, 0x6c, 0x81, 0xf1, 0x4d, 0x0f, 0xf4, 0x00, + 0x06, 0x5c, 0x64, 0xb4, 0x16, 0xf3, 0x2a, 0x11, 0x0b, 0x7f, 0x5f, 0xd1, 0x83, 0x4e, 0x9d, 0x27, + 0x62, 0x61, 0x0a, 0x30, 0x63, 0xba, 0xc0, 0x6d, 0x0a, 0x30, 0x63, 0xaa, 0x60, 0x0c, 0x1d, 0x5c, + 0xae, 0xfd, 0xae, 0x9a, 0x52, 0x1e, 0xe5, 0xe0, 0x35, 0xc7, 0xcc, 0xef, 0xa9, 0x5a, 0x75, 0x46, + 0xf7, 0xc1, 0x13, 0x09, 0x5f, 0xce, 0x33, 0xc2, 0x7c, 0x4f, 0xe5, 0x7b, 0x32, 0xfe, 0x82, 0x30, + 0xf4, 0x10, 0x46, 0x76, 0x9e, 0xf9, 0x8a, 0x14, 0x44, 0x70, 0xbf, 0x3f, 0x71, 0xa6, 0x5e, 0x7c, + 0x68, 0xd3, 0x67, 0x2a, 0x8b, 0x1e, 0xc1, 0x3b, 0x97, 0x09, 0x27, 0xe9, 0xbc, 0x62, 0x34, 0xc5, + 0x9c, 0xcf, 0xd3, 0x9c, 0xd1, 0xba, 0xf2, 0x41, 0x55, 0x23, 0xf5, 0xdf, 0xb9, 0xfe, 0xeb, 0x54, + 0xfd, 0x13, 0xfe, 0x00, 0x87, 0xf6, 0x96, 0x79, 0x45, 0x4b, 0x8e, 0xd1, 0x57, 0xd0, 0x33, 0x68, + 0x75, 0xd5, 0x83, 0xe3, 0x27, 0x6d, 0xaf, 0xcf, 0x74, 0xbe, 0x10, 0x89, 0xc0, 0xb1, 0x6d, 0x12, + 0x1e, 0xc0, 0xe0, 0x75, 0x42, 0x84, 0x79, 0xc5, 0xf0, 0x7b, 0x18, 0xea, 0xf0, 0x2d, 0xd1, 0x9d, + 0xc1, 0xe8, 0x62, 0x51, 0x8b, 0x8c, 0xfe, 0x5c, 0x5a, 0xe1, 0x1c, 0x41, 0x97, 0x93, 0xbc, 0x4c, + 0x56, 0x46, 0x3b, 0x26, 0x42, 0xef, 0xc3, 0x30, 0x67, 0x49, 0x8a, 0xe7, 0x15, 0x66, 0x84, 0x66, + 0xfe, 0xde, 0xc4, 0x99, 0x76, 0xe2, 0x81, 0xca, 0x9d, 0xab, 0x54, 0x88, 0x60, 0x7c, 0xd3, 0x4d, + 0x4f, 0x1c, 0x12, 0x38, 0xfa, 0xa6, 0xca, 0x24, 0x69, 0x23, 0x17, 0x43, 0xb4, 0xa5, 0x3d, 0xe7, + 0xff, 0x6b, 0x2f, 0xbc, 0x0f, 0xef, 0xbe, 0x41, 0x65, 0xa6, 0x18, 0xc3, 0xe1, 0xb7, 0x98, 0x71, + 0x42, 0xed, 0x9a, 0xe1, 0x47, 0x30, 0x6a, 0x32, 0xe6, 0x72, 0x7d, 0xe8, 0xad, 0x75, 0xca, 0xac, + 0x6e, 0xc3, 0xf0, 0x10, 0x86, 0xf2, 0xe2, 0xec, 0xe8, 0xe1, 0x6b, 0x38, 0x30, 0xb1, 0x81, 0x7e, + 0x09, 0x2e, 0x97, 0x09, 0xb3, 0xc7, 0xa3, 0x7f, 0xdc, 0xc3, 0x7c, 0xc3, 0x66, 0x8d, 0x57, 0x09, + 0x5f, 0xea, 0x46, 0x1a, 0x1e, 0x3e, 0x84, 0x83, 0x0b, 0x75, 0xdd, 0xff, 0xf2, 0x1a, 0x72, 0x21, + 0x5b, 0x68, 0x56, 0x5c, 0xc2, 0xe0, 0xc5, 0x15, 0x4e, 0x2d, 0xf0, 0x19, 0x78, 0x19, 0x4e, 0xb2, + 0x15, 0x29, 0xb1, 0x19, 0x2a, 0x88, 0xb4, 0x09, 0x45, 0xd6, 0x84, 0xa2, 0x57, 0xd6, 0x84, 0xe2, + 0xa6, 0xd6, 0xfa, 0xc6, 0xde, 0x9b, 0xbe, 0xd1, 0xb9, 0xf1, 0x8d, 0xf0, 0x14, 0x86, 0x9a, 0xcc, + 0xec, 0x7f, 0x04, 0x5d, 0x5a, 0x8b, 0xaa, 0x16, 0x8a, 0x6b, 0x18, 0x9b, 0x08, 0xbd, 0x07, 0x7d, + 0x7c, 0x45, 0xc4, 0x3c, 0xa5, 0x19, 0x56, 0x3d, 0xdd, 0xd8, 0x93, 0x89, 0x53, 0x9a, 0xe1, 0x10, + 0x43, 0xbf, 0x79, 0x29, 0xc5, 0x5b, 0xd5, 0x0a, 0xee, 0xc6, 0xf2, 0x88, 0x02, 0xf0, 0x0a, 0x5c, + 0x50, 0x76, 0xfd, 0xf2, 0xc4, 0x42, 0x6d, 0x2c, 0xf9, 0x32, 0xc2, 0x97, 0x2f, 0x4f, 0x94, 0x69, + 0xb9, 0xb1, 0x89, 0xe4, 0xac, 0x84, 0x56, 0x5c, 0xf9, 0x8e, 0x1b, 0xab, 0x73, 0xf8, 0x9b, 0x03, + 0xc3, 0x4d, 0xf5, 0x4b, 0xaa, 0x8a, 0x64, 0x96, 0xaa, 0x22, 0xd9, 0x9d, 0x63, 0x6e, 0x3c, 0x81, + 0xe1, 0x32, 0x1f, 0x44, 0x04, 0xfb, 0xd2, 0xc5, 0x15, 0xd7, 0xdd, 0xb7, 0xab, 0xea, 0x8e, 0xff, + 0xec, 0x81, 0xf7, 0xc2, 0xc8, 0x18, 0x5d, 0x43, 0x57, 0x3b, 0x09, 0x7a, 0xda, 0x56, 0xf3, 0x5b, + 0xfe, 0x1e, 0x3c, 0xdb, 0x15, 0x66, 0x64, 0x72, 0x0f, 0x71, 0xd8, 0x97, 0x9e, 0x82, 0x1e, 0xb7, + 0xed, 0xb0, 0x61, 0x48, 0xc1, 0x93, 0xdd, 0x40, 0x0d, 0xe9, 0xaf, 0xe0, 0x59, 0x6b, 0x40, 0xcf, + 0xdb, 0xf6, 0xb8, 0x65, 0x4d, 0xc1, 0x27, 0xbb, 0x03, 0x9b, 0x01, 0x7e, 0x77, 0x60, 0x74, 0xcb, + 0x1d, 0xd0, 0x67, 0x6d, 0xfb, 0xfd, 0xbd, 0x83, 0x05, 0x9f, 0xff, 0x67, 0x7c, 0x33, 0xd6, 0x2f, + 0xd0, 0x33, 0x36, 0x84, 0x5a, 0xbf, 0xe8, 0xb6, 0x93, 0x05, 0xcf, 0x77, 0xc6, 0x35, 0xec, 0x6b, + 0x70, 0x95, 0xfd, 0xa0, 0xd6, 0xcf, 0xba, 0x69, 0x83, 0xc1, 0xd3, 0x1d, 0x51, 0x0d, 0xef, 0x35, + 0x74, 0xb5, 0x7b, 0xb5, 0x57, 0xff, 0x96, 0x2d, 0xb6, 0x57, 0xff, 0x2d, 0x93, 0x54, 0xea, 0x97, + 0x1f, 0x61, 0x7b, 0xf5, 0x6f, 0x98, 0x6a, 0x7b, 0xf5, 0x6f, 0x9a, 0x63, 0x78, 0xef, 0xa4, 0xf7, + 0x9d, 0xab, 0x6d, 0xa1, 0xab, 0x7e, 0x1e, 0xff, 0x15, 0x00, 0x00, 0xff, 0xff, 0xa6, 0x03, 0x32, + 0x69, 0x5f, 0x0a, 0x00, 0x00, +} diff --git a/plugins/executor/proto/executor.proto b/plugins/executor/proto/executor.proto new file mode 100644 index 000000000..1ec576caa --- /dev/null +++ b/plugins/executor/proto/executor.proto @@ -0,0 +1,96 @@ +syntax = "proto3"; +package hashicorp.nomad.plugins.executor.proto; +option go_package = "proto"; + +import "google/protobuf/timestamp.proto"; +import "github.com/hashicorp/nomad/plugins/drivers/proto/driver.proto"; + +service Executor { + rpc Launch(LaunchRequest) returns (LaunchResponse) {} + rpc Wait(WaitRequest) returns (WaitResponse) {} + rpc Shutdown(ShutdownRequest) returns (ShutdownResponse) {} + rpc UpdateResources(UpdateResourcesRequest) returns (UpdateResourcesResponse) {} + rpc Version(VersionRequest) returns (VersionResponse) {} + rpc Stats(StatsRequest) returns (StatsResponse) {} + rpc Signal(SignalRequest) returns (SignalResponse) {} + rpc Exec(ExecRequest) returns (ExecResponse) {} +} + +message LaunchRequest { + string cmd = 1; + repeated string args = 2; + Resources resources = 3; + string stdout_path = 4; + string stderr_path = 5; + repeated string env = 6; + string user = 7; + string task_dir = 8; + bool resource_limits = 9; + bool basic_process_cgroup = 10; +} + +message LaunchResponse { + ProcessState process = 1; +} + +message WaitRequest {} + +message WaitResponse{ + ProcessState process = 1; +} + +message ShutdownRequest { + string signal = 1; + int64 grace_period = 2; +} + +message ShutdownResponse {} + +message UpdateResourcesRequest{ + Resources resources = 1; +} + +message UpdateResourcesResponse {} + +message VersionRequest {} + +message VersionResponse{ + string version = 1; +} + +message StatsRequest {} + +message StatsResponse { + hashicorp.nomad.plugins.drivers.proto.TaskStats stats = 1; +} + +message SignalRequest { + string signal = 1; +} + +message SignalResponse {} + +message ExecRequest { + google.protobuf.Timestamp deadline = 1; + string cmd = 2; + repeated string args = 3; +} + +message ExecResponse { + bytes output = 1; + int32 exit_code = 2; +} + +message Resources { + int32 cpu = 1; + int32 memoryMB = 2; + int32 diskMB = 3; + int32 iops = 4; +} + +message ProcessState { + int32 pid = 1; + int32 exit_code = 2; + int32 signal = 3; + google.protobuf.Timestamp time = 4; +} diff --git a/plugins/executor/server.go b/plugins/executor/server.go new file mode 100644 index 000000000..d3146dd70 --- /dev/null +++ b/plugins/executor/server.go @@ -0,0 +1,132 @@ +package executor + +import ( + "fmt" + "time" + + "github.com/golang/protobuf/ptypes" + "github.com/hashicorp/consul-template/signals" + "github.com/hashicorp/nomad/drivers/shared/executor/structs" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/hashicorp/nomad/plugins/executor/proto" + "golang.org/x/net/context" +) + +type grpcExecutorServer struct { + impl structs.Executor + doneCtx context.Context +} + +func (s *grpcExecutorServer) Launch(ctx context.Context, req *proto.LaunchRequest) (*proto.LaunchResponse, error) { + ps, err := s.impl.Launch(&structs.ExecCommand{ + Cmd: req.Cmd, + Args: req.Args, + Resources: resourcesFromProto(req.Resources), + StdoutPath: req.StdoutPath, + StderrPath: req.StderrPath, + Env: req.Env, + User: req.User, + TaskDir: req.TaskDir, + ResourceLimits: req.ResourceLimits, + BasicProcessCgroup: req.BasicProcessCgroup, + }) + + if err != nil { + return nil, err + } + + process, err := processStateToProto(ps) + if err != nil { + return nil, err + } + + return &proto.LaunchResponse{ + Process: process, + }, nil +} + +func (s *grpcExecutorServer) Wait(ctx context.Context, req *proto.WaitRequest) (*proto.WaitResponse, error) { + ps, err := s.impl.Wait(ctx) + if err != nil { + return nil, err + } + + process, err := processStateToProto(ps) + if err != nil { + return nil, err + } + + return &proto.WaitResponse{ + Process: process, + }, nil +} + +func (s *grpcExecutorServer) Shutdown(ctx context.Context, req *proto.ShutdownRequest) (*proto.ShutdownResponse, error) { + if err := s.impl.Shutdown(req.Signal, time.Duration(req.GracePeriod)); err != nil { + return nil, err + } + + return &proto.ShutdownResponse{}, nil +} + +func (s *grpcExecutorServer) UpdateResources(ctx context.Context, req *proto.UpdateResourcesRequest) (*proto.UpdateResourcesResponse, error) { + if err := s.impl.UpdateResources(resourcesFromProto(req.Resources)); err != nil { + return nil, err + } + + return &proto.UpdateResourcesResponse{}, nil +} + +func (s *grpcExecutorServer) Version(context.Context, *proto.VersionRequest) (*proto.VersionResponse, error) { + v, err := s.impl.Version() + if err != nil { + return nil, err + } + + return &proto.VersionResponse{ + Version: v.Version, + }, nil +} + +func (s *grpcExecutorServer) Stats(context.Context, *proto.StatsRequest) (*proto.StatsResponse, error) { + stats, err := s.impl.Stats() + if err != nil { + return nil, err + } + + pbStats, err := drivers.TaskStatsToProto(stats) + if err != nil { + return nil, err + } + + return &proto.StatsResponse{ + Stats: pbStats, + }, nil +} + +func (s *grpcExecutorServer) Signal(ctx context.Context, req *proto.SignalRequest) (*proto.SignalResponse, error) { + if sig, ok := signals.SignalLookup[req.Signal]; ok { + if err := s.impl.Signal(sig); err != nil { + return nil, err + } + return &proto.SignalResponse{}, nil + } + return nil, fmt.Errorf("invalid signal sent by client") +} + +func (s *grpcExecutorServer) Exec(ctx context.Context, req *proto.ExecRequest) (*proto.ExecResponse, error) { + deadline, err := ptypes.Timestamp(req.Deadline) + if err != nil { + return nil, err + } + + out, exit, err := s.impl.Exec(deadline, req.Cmd, req.Args) + if err != nil { + return nil, err + } + + return &proto.ExecResponse{ + Output: out, + ExitCode: int32(exit), + }, nil +} diff --git a/plugins/executor/utils.go b/plugins/executor/utils.go new file mode 100644 index 000000000..e511c42a1 --- /dev/null +++ b/plugins/executor/utils.go @@ -0,0 +1,62 @@ +package executor + +import ( + "github.com/golang/protobuf/ptypes" + "github.com/hashicorp/nomad/drivers/shared/executor/structs" + "github.com/hashicorp/nomad/plugins/executor/proto" +) + +func processStateToProto(ps *structs.ProcessState) (*proto.ProcessState, error) { + timestamp, err := ptypes.TimestampProto(ps.Time) + if err != nil { + return nil, err + } + pb := &proto.ProcessState{ + Pid: int32(ps.Pid), + ExitCode: int32(ps.ExitCode), + Signal: int32(ps.Signal), + Time: timestamp, + } + + return pb, nil +} + +func processStateFromProto(pb *proto.ProcessState) (*structs.ProcessState, error) { + timestamp, err := ptypes.Timestamp(pb.Time) + if err != nil { + return nil, err + } + + return &structs.ProcessState{ + Pid: int(pb.Pid), + ExitCode: int(pb.ExitCode), + Signal: int(pb.Signal), + Time: timestamp, + }, nil +} + +func resourcesToProto(r *structs.Resources) *proto.Resources { + if r == nil { + return &proto.Resources{} + } + + return &proto.Resources{ + Cpu: int32(r.CPU), + MemoryMB: int32(r.MemoryMB), + DiskMB: int32(r.DiskMB), + Iops: int32(r.IOPS), + } +} + +func resourcesFromProto(pb *proto.Resources) *structs.Resources { + if pb == nil { + return &structs.Resources{} + } + + return &structs.Resources{ + CPU: int(pb.Cpu), + MemoryMB: int(pb.MemoryMB), + DiskMB: int(pb.DiskMB), + IOPS: int(pb.Iops), + } +}