From 0f355e8f87968b12a26fc9f4d2d23da8d9eeff1f Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 17 Mar 2016 02:53:31 -0700 Subject: [PATCH] Introduced a method in executor to launch syslog server --- client/driver/docker.go | 28 +-- client/driver/docker_test.go | 4 +- client/driver/exec.go | 21 +- client/driver/executor/executor.go | 230 ++++++++++++------ client/driver/executor/executor_linux.go | 18 +- client/driver/executor/executor_posix.go | 44 ++++ client/driver/executor/executor_test.go | 47 ++-- client/driver/executor/executor_windows.go | 5 + client/driver/executor_plugin.go | 36 +++ client/driver/java.go | 21 +- .../driver/logging/syslog_server_windows.go | 10 + client/driver/qemu.go | 10 +- client/driver/raw_exec.go | 10 +- client/driver/rkt.go | 11 +- client/driver/structs/structs.go | 4 + 15 files changed, 348 insertions(+), 151 deletions(-) create mode 100644 client/driver/executor/executor_posix.go create mode 100644 client/driver/executor/executor_windows.go create mode 100644 client/driver/logging/syslog_server_windows.go diff --git a/client/driver/docker.go b/client/driver/docker.go index 7c04daacb..5829b7ad5 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -20,7 +20,7 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" - "github.com/hashicorp/nomad/client/driver/logging" + "github.com/hashicorp/nomad/client/driver/executor" cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/nomad/structs" @@ -101,7 +101,7 @@ type dockerPID struct { type DockerHandle struct { pluginClient *plugin.Client - logCollector logging.LogCollector + executor executor.Executor client *docker.Client logger *log.Logger cleanupContainer bool @@ -533,23 +533,23 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle if err != nil { return nil, fmt.Errorf("unable to find the nomad binary: %v", err) } - pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-syslog-collector.out", task.Name)) + pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name)) pluginConfig := &plugin.ClientConfig{ - Cmd: exec.Command(bin, "syslog", pluginLogFile), + Cmd: exec.Command(bin, "executor", pluginLogFile), } - logCollector, pluginClient, err := createLogCollector(pluginConfig, d.config.LogOutput, d.config) + exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) if err != nil { return nil, err } - logCollectorCtx := &logging.LogCollectorContext{ - TaskName: task.Name, + executorCtx := &executor.ExecutorContext{ + TaskEnv: d.taskEnv, + Task: task, AllocDir: ctx.AllocDir, - LogConfig: task.LogConfig, PortLowerBound: d.config.ClientMinPort, PortUpperBound: d.config.ClientMaxPort, } - ss, err := logCollector.LaunchCollector(logCollectorCtx) + ss, err := exec.LaunchSyslogServer(executorCtx) if err != nil { return nil, fmt.Errorf("failed to start syslog collector: %v", err) } @@ -629,7 +629,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle maxKill := d.DriverContext.config.MaxKillTimeout h := &DockerHandle{ client: client, - logCollector: logCollector, + executor: exec, pluginClient: pluginClient, cleanupContainer: cleanupContainer, cleanupImage: cleanupImage, @@ -686,7 +686,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er if !found { return nil, fmt.Errorf("Failed to find container %s: %v", pid.ContainerID, err) } - logCollector, pluginClient, err := createLogCollector(pluginConfig, d.config.LogOutput, d.config) + exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) if err != nil { d.logger.Printf("[INFO] driver.docker: couldn't re-attach to the plugin process: %v", err) if e := client.StopContainer(pid.ContainerID, uint(pid.KillTimeout*time.Second)); e != nil { @@ -698,7 +698,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er // Return a driver handle h := &DockerHandle{ client: client, - logCollector: logCollector, + executor: exec, pluginClient: pluginClient, cleanupContainer: cleanupContainer, cleanupImage: cleanupImage, @@ -743,7 +743,7 @@ func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult { func (h *DockerHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout) - if err := h.logCollector.UpdateLogConfig(task.LogConfig); err != nil { + if err := h.executor.UpdateTask(task); err != nil { h.logger.Printf("[DEBUG] driver.docker: failed to update log config: %v", err) } @@ -824,7 +824,7 @@ func (h *DockerHandle) run() { close(h.waitCh) // Shutdown the syslog collector - if err := h.logCollector.Exit(); err != nil { + if err := h.executor.Exit(); err != nil { h.logger.Printf("[ERR] driver.docker: failed to kill the syslog collector: %v", err) } h.pluginClient.Kill() diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index 9e641c219..0410855d2 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -145,7 +145,7 @@ func TestDockerDriver_Handle(t *testing.T) { pluginConfig := &plugin.ClientConfig{ Cmd: exec.Command(bin, "syslog", f.Name()), } - logCollector, pluginClient, err := createLogCollector(pluginConfig, os.Stdout, &config.Config{}) + exec, pluginClient, err := createExecutor(pluginConfig, os.Stdout, &config.Config{}) if err != nil { t.Fatalf("got an err: %v", err) } @@ -154,7 +154,7 @@ func TestDockerDriver_Handle(t *testing.T) { h := &DockerHandle{ version: "version", imageID: "imageid", - logCollector: logCollector, + executor: exec, pluginClient: pluginClient, containerID: "containerid", killTimeout: 5 * time.Nanosecond, diff --git a/client/driver/exec.go b/client/driver/exec.go index c832ab9de..c3e99a3e8 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -100,16 +100,17 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, err } executorCtx := &executor.ExecutorContext{ - TaskEnv: d.taskEnv, - AllocDir: ctx.AllocDir, - TaskName: task.Name, - TaskResources: task.Resources, - LogConfig: task.LogConfig, - ResourceLimits: true, - FSIsolation: true, - UnprivilegedUser: true, + TaskEnv: d.taskEnv, + AllocDir: ctx.AllocDir, + Task: task, } - ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: command, Args: driverConfig.Args}, executorCtx) + ps, err := exec.LaunchCmd(&executor.ExecCommand{ + Cmd: command, + Args: driverConfig.Args, + FSIsolation: true, + ResourceLimits: true, + User: cstructs.DefaultUnpriviledgedUser, + }, executorCtx) if err != nil { pluginClient.Kill() return nil, fmt.Errorf("error starting process via the plugin: %v", err) @@ -217,7 +218,7 @@ func (h *execHandle) WaitCh() chan *cstructs.WaitResult { func (h *execHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout) - h.executor.UpdateLogConfig(task.LogConfig) + h.executor.UpdateTask(task) // Update is not possible return nil diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 3c0596bdb..9ace1cb72 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -2,7 +2,9 @@ package executor import ( "fmt" + "io/ioutil" "log" + "net" "os" "os/exec" "runtime" @@ -21,6 +23,18 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +// Executor is the interface which allows a driver to launch and supervise +// a process +type Executor interface { + LaunchCmd(command *ExecCommand, ctx *ExecutorContext) (*ProcessState, error) + LaunchSyslogServer(ctx *ExecutorContext) (*SyslogServerState, error) + Wait() (*ProcessState, error) + ShutDown() error + Exit() error + UpdateLogConfig(logConfig *structs.LogConfig) error + UpdateTask(task *structs.Task) error +} + // ExecutorContext holds context to configure the command user // wants to run and isolate it type ExecutorContext struct { @@ -31,33 +45,36 @@ type ExecutorContext struct { // the task AllocDir *allocdir.AllocDir - // TaskName is the name of the Task - TaskName string + // Task is the task whose executor is being launched + Task *structs.Task - // TaskResources are the resource constraints for the Task - TaskResources *structs.Resources + // PortUpperBound is the upper bound of the ports that we can use to start + // the syslog server + PortUpperBound uint - // FSIsolation is a flag for drivers to impose file system - // isolation on certain platforms - FSIsolation bool - - // ResourceLimits is a flag for drivers to impose resource - // contraints on a Task on certain platforms - ResourceLimits bool - - // UnprivilegedUser is a flag for drivers to make the process - // run as nobody - UnprivilegedUser bool - - // LogConfig provides the configuration related to log rotation - LogConfig *structs.LogConfig + // PortLowerBound is the lower bound of the ports that we can use to start + // the syslog server + PortLowerBound uint } -// ExecCommand holds the user command and args. It's a lightweight replacement -// of exec.Cmd for serialization purposes. +// ExecCommand holds the user command, args, and other isolation related +// settings. type ExecCommand struct { - Cmd string + // Cmd is the command that the user wants to run. + Cmd string + + // Args is the args of the command that the user wants to run. Args []string + + // FSIsolation determines whether the command would be run in a chroot. + FSIsolation bool + + // User is the user which the executor uses to run the command. + User string + + // ResourceLimits determines whether resource limits are enforced by the + // executor. + ResourceLimits bool } // ProcessState holds information about the state of a user process. @@ -69,37 +86,44 @@ type ProcessState struct { Time time.Time } -// Executor is the interface which allows a driver to launch and supervise -// a process -type Executor interface { - LaunchCmd(command *ExecCommand, ctx *ExecutorContext) (*ProcessState, error) - Wait() (*ProcessState, error) - ShutDown() error - Exit() error - UpdateLogConfig(logConfig *structs.LogConfig) error +// SyslogServerState holds the address and islation information of a launched +// syslog server +type SyslogServerState struct { + IsolationConfig *cstructs.IsolationConfig + Addr string } // UniversalExecutor is an implementation of the Executor which launches and // supervises processes. In addition to process supervision it provides resource // and file system isolation type UniversalExecutor struct { - cmd exec.Cmd - ctx *ExecutorContext + cmd exec.Cmd + ctx *ExecutorContext + command *ExecCommand taskDir string - groups *cgroupConfig.Cgroup exitState *ProcessState processExited chan interface{} - lre *logging.FileRotator - lro *logging.FileRotator + + lre *logging.FileRotator + lro *logging.FileRotator + rotatorLock sync.Mutex + + syslogServer *logging.SyslogServer + syslogChan chan *logging.SyslogMessage + + groups *cgroupConfig.Cgroup + cgLock sync.Mutex logger *log.Logger - lock sync.Mutex } // NewExecutor returns an Executor func NewExecutor(logger *log.Logger) Executor { - return &UniversalExecutor{logger: logger, processExited: make(chan interface{})} + return &UniversalExecutor{ + logger: logger, + processExited: make(chan interface{}), + } } // LaunchCmd launches a process and returns it's state. It also configures an @@ -108,6 +132,7 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext e.logger.Printf("[DEBUG] executor: launching command %v %v", command.Cmd, strings.Join(command.Args, " ")) e.ctx = ctx + e.command = command // configuring the task dir if err := e.configureTaskDir(); err != nil { @@ -121,29 +146,18 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext } // setting the user of the process - if e.ctx.UnprivilegedUser { - if err := e.runAs("nobody"); err != nil { + if command.User != "" { + if err := e.runAs(command.User); err != nil { return nil, err } } - logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024) - lro, err := logging.NewFileRotator(ctx.AllocDir.LogDir(), fmt.Sprintf("%v.stdout", ctx.TaskName), - ctx.LogConfig.MaxFiles, logFileSize, e.logger) - - if err != nil { - return nil, fmt.Errorf("error creating log rotator for stdout of task %v", err) + // Setup the loggers + if err := e.configureLoggers(); err != nil { + return nil, err } - e.cmd.Stdout = lro - e.lro = lro - - lre, err := logging.NewFileRotator(ctx.AllocDir.LogDir(), fmt.Sprintf("%v.stderr", ctx.TaskName), - ctx.LogConfig.MaxFiles, logFileSize, e.logger) - if err != nil { - return nil, fmt.Errorf("error creating log rotator for stderr of task %v", err) - } - e.cmd.Stderr = lre - e.lre = lre + e.cmd.Stdout = e.lro + e.cmd.Stderr = e.lre // setting the env, path and args for the command e.ctx.TaskEnv.Build() @@ -165,6 +179,32 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext return &ProcessState{Pid: e.cmd.Process.Pid, ExitCode: -1, IsolationConfig: ic, Time: time.Now()}, nil } +// configureLoggers sets up the standard out/error file rotators +func (e *UniversalExecutor) configureLoggers() error { + e.rotatorLock.Lock() + defer e.rotatorLock.Unlock() + + logFileSize := int64(e.ctx.Task.LogConfig.MaxFileSizeMB * 1024 * 1024) + if e.lro == nil { + lro, err := logging.NewFileRotator(e.ctx.AllocDir.LogDir(), fmt.Sprintf("%v.stdout", e.ctx.Task.Name), + e.ctx.Task.LogConfig.MaxFiles, logFileSize, e.logger) + if err != nil { + return err + } + e.lro = lro + } + + if e.lre == nil { + lre, err := logging.NewFileRotator(e.ctx.AllocDir.LogDir(), fmt.Sprintf("%v.stderr", e.ctx.Task.Name), + e.ctx.Task.LogConfig.MaxFiles, logFileSize, e.logger) + if err != nil { + return err + } + e.lre = lre + } + return nil +} + // Wait waits until a process has exited and returns it's exitcode and errors func (e *UniversalExecutor) Wait() (*ProcessState, error) { <-e.processExited @@ -173,7 +213,7 @@ func (e *UniversalExecutor) Wait() (*ProcessState, error) { // UpdateLogConfig updates the log configuration func (e *UniversalExecutor) UpdateLogConfig(logConfig *structs.LogConfig) error { - e.ctx.LogConfig = logConfig + e.ctx.Task.LogConfig = logConfig if e.lro == nil { return fmt.Errorf("log rotator for stdout doesn't exist") } @@ -188,9 +228,22 @@ func (e *UniversalExecutor) UpdateLogConfig(logConfig *structs.LogConfig) error return nil } +func (e *UniversalExecutor) UpdateTask(task *structs.Task) error { + e.ctx.Task = task + fileSize := int64(task.LogConfig.MaxFileSizeMB * 1024 * 1024) + e.lro.MaxFiles = task.LogConfig.MaxFiles + e.lro.FileSize = fileSize + e.lre.MaxFiles = task.LogConfig.MaxFiles + e.lre.FileSize = fileSize + return nil +} + func (e *UniversalExecutor) wait() { defer close(e.processExited) err := e.cmd.Wait() + if e.syslogServer != nil { + e.syslogServer.Shutdown() + } e.lre.Close() e.lro.Close() if err == nil { @@ -203,14 +256,6 @@ func (e *UniversalExecutor) wait() { exitCode = status.ExitStatus() } } - if e.ctx.FSIsolation { - e.removeChrootMounts() - } - if e.ctx.ResourceLimits { - e.lock.Lock() - DestroyCgroup(e.groups) - e.lock.Unlock() - } e.exitState = &ProcessState{Pid: 0, ExitCode: exitCode, Time: time.Now()} } @@ -224,7 +269,7 @@ var ( // process func (e *UniversalExecutor) Exit() error { var merr multierror.Error - if e.cmd.Process != nil { + if e.command != nil && e.cmd.Process != nil { proc, err := os.FindProcess(e.cmd.Process.Pid) if err != nil { e.logger.Printf("[ERR] executor: can't find process with pid: %v, err: %v", @@ -235,17 +280,17 @@ func (e *UniversalExecutor) Exit() error { } } - if e.ctx.FSIsolation { + if e.command != nil && e.command.FSIsolation { if err := e.removeChrootMounts(); err != nil { merr.Errors = append(merr.Errors, err) } } - if e.ctx.ResourceLimits { - e.lock.Lock() + if e.command != nil && e.command.ResourceLimits { + e.cgLock.Lock() if err := DestroyCgroup(e.groups); err != nil { merr.Errors = append(merr.Errors, err) } - e.lock.Unlock() + e.cgLock.Unlock() } return merr.ErrorOrNil() } @@ -270,10 +315,10 @@ func (e *UniversalExecutor) ShutDown() error { // configureTaskDir sets the task dir in the executor func (e *UniversalExecutor) configureTaskDir() error { - taskDir, ok := e.ctx.AllocDir.TaskDirs[e.ctx.TaskName] + taskDir, ok := e.ctx.AllocDir.TaskDirs[e.ctx.Task.Name] e.taskDir = taskDir if !ok { - return fmt.Errorf("couldn't find task directory for task %v", e.ctx.TaskName) + return fmt.Errorf("couldn't find task directory for task %v", e.ctx.Task.Name) } e.cmd.Dir = taskDir return nil @@ -299,3 +344,48 @@ func (e *UniversalExecutor) makeExecutablePosix(binPath string) error { } return nil } + +// getFreePort returns a free port ready to be listened on between upper and +// lower bounds +func (e *UniversalExecutor) getListener(lowerBound uint, upperBound uint) (net.Listener, error) { + if runtime.GOOS == "windows" { + return e.listenerTCP(lowerBound, upperBound) + } + + return e.listenerUnix() +} + +// listenerTCP creates a TCP listener using an unused port between an upper and +// lower bound +func (e *UniversalExecutor) listenerTCP(lowerBound uint, upperBound uint) (net.Listener, error) { + for i := lowerBound; i <= upperBound; i++ { + addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%v", i)) + if err != nil { + return nil, err + } + l, err := net.ListenTCP("tcp", addr) + if err != nil { + continue + } + return l, nil + } + return nil, fmt.Errorf("No free port found") +} + +// listenerUnix creates a Unix domain socket +func (e *UniversalExecutor) listenerUnix() (net.Listener, error) { + f, err := ioutil.TempFile("", "plugin") + if err != nil { + return nil, err + } + path := f.Name() + + if err := f.Close(); err != nil { + return nil, err + } + if err := os.Remove(path); err != nil { + return nil, err + } + + return net.Listen("unix", path) +} diff --git a/client/driver/executor/executor_linux.go b/client/driver/executor/executor_linux.go index 17a83d5d7..d6c4cf38b 100644 --- a/client/driver/executor/executor_linux.go +++ b/client/driver/executor/executor_linux.go @@ -38,7 +38,7 @@ var ( func (e *UniversalExecutor) makeExecutable(binPath string) error { path := binPath - if e.ctx.FSIsolation { + if e.command.FSIsolation { // The path must be relative the chroot path = filepath.Join(e.taskDir, binPath) } else if !filepath.IsAbs(binPath) { @@ -50,14 +50,14 @@ func (e *UniversalExecutor) makeExecutable(binPath string) error { // configureIsolation configures chroot and creates cgroups func (e *UniversalExecutor) configureIsolation() error { - if e.ctx.FSIsolation { + if e.command.FSIsolation { if err := e.configureChroot(); err != nil { return err } } - if e.ctx.ResourceLimits { - if err := e.configureCgroups(e.ctx.TaskResources); err != nil { + if e.command.ResourceLimits { + if err := e.configureCgroups(e.ctx.Task.Resources); err != nil { return fmt.Errorf("error creating cgroups: %v", err) } if err := e.applyLimits(os.Getpid()); err != nil { @@ -75,7 +75,7 @@ func (e *UniversalExecutor) configureIsolation() error { // applyLimits puts a process in a pre-configured cgroup func (e *UniversalExecutor) applyLimits(pid int) error { - if !e.ctx.ResourceLimits { + if !e.command.ResourceLimits { return nil } @@ -167,11 +167,11 @@ func (e *UniversalExecutor) runAs(userid string) error { // configureChroot configures a chroot func (e *UniversalExecutor) configureChroot() error { allocDir := e.ctx.AllocDir - if err := allocDir.MountSharedDir(e.ctx.TaskName); err != nil { + if err := allocDir.MountSharedDir(e.ctx.Task.Name); err != nil { return err } - if err := allocDir.Embed(e.ctx.TaskName, chrootEnv); err != nil { + if err := allocDir.Embed(e.ctx.Task.Name, chrootEnv); err != nil { return err } @@ -195,8 +195,8 @@ func (e *UniversalExecutor) configureChroot() error { // should be called when tearing down the task. func (e *UniversalExecutor) removeChrootMounts() error { // Prevent a race between Wait/ForceStop - e.lock.Lock() - defer e.lock.Unlock() + e.cgLock.Lock() + defer e.cgLock.Unlock() return e.ctx.AllocDir.UnmountAll() } diff --git a/client/driver/executor/executor_posix.go b/client/driver/executor/executor_posix.go new file mode 100644 index 000000000..4be468b2c --- /dev/null +++ b/client/driver/executor/executor_posix.go @@ -0,0 +1,44 @@ +// +build !windows + +package executor + +import ( + "fmt" + "io" + "log/syslog" + + "github.com/hashicorp/nomad/client/driver/logging" +) + +func (e *UniversalExecutor) LaunchSyslogServer(ctx *ExecutorContext) (*SyslogServerState, error) { + e.ctx = ctx + e.syslogChan = make(chan *logging.SyslogMessage, 2048) + l, err := e.getListener(e.ctx.PortLowerBound, e.ctx.PortUpperBound) + if err != nil { + return nil, err + } + e.logger.Printf("[DEBUG] sylog-server: launching syslog server on addr: %v", l.Addr().String()) + if err := e.configureLoggers(); err != nil { + return nil, err + } + + e.syslogServer = logging.NewSyslogServer(l, e.syslogChan, e.logger) + go e.syslogServer.Start() + go e.collectLogs(e.lre, e.lro) + syslogAddr := fmt.Sprintf("%s://%s", l.Addr().Network(), l.Addr().String()) + return &SyslogServerState{Addr: syslogAddr}, nil +} + +func (e *UniversalExecutor) collectLogs(we io.Writer, wo io.Writer) { + for logParts := range e.syslogChan { + // If the severity of the log line is err then we write to stderr + // otherwise all messages go to stdout + if logParts.Severity == syslog.LOG_ERR { + e.lre.Write(logParts.Message) + e.lre.Write([]byte{'\n'}) + } else { + e.lro.Write(logParts.Message) + e.lro.Write([]byte{'\n'}) + } + } +} diff --git a/client/driver/executor/executor_test.go b/client/driver/executor/executor_test.go index 2c45d0b45..8798170e7 100644 --- a/client/driver/executor/executor_test.go +++ b/client/driver/executor/executor_test.go @@ -30,7 +30,7 @@ var ( } ) -func mockAllocDir(t *testing.T) (string, *allocdir.AllocDir) { +func mockAllocDir(t *testing.T) (*structs.Task, *allocdir.AllocDir) { alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] @@ -39,18 +39,16 @@ func mockAllocDir(t *testing.T) (string, *allocdir.AllocDir) { log.Panicf("allocDir.Build() failed: %v", err) } - return task.Name, allocDir + return task, allocDir } func testExecutorContext(t *testing.T) *ExecutorContext { taskEnv := env.NewTaskEnvironment(mock.Node()) - taskName, allocDir := mockAllocDir(t) + task, allocDir := mockAllocDir(t) ctx := &ExecutorContext{ - TaskEnv: taskEnv, - TaskName: taskName, - AllocDir: allocDir, - TaskResources: constraint, - LogConfig: structs.DefaultLogConfig(), + TaskEnv: taskEnv, + Task: task, + AllocDir: allocDir, } return ctx } @@ -80,6 +78,9 @@ func TestExecutor_Start_Wait_Failure_Code(t *testing.T) { if ps.ExitCode < 1 { t.Fatalf("expected exit code to be non zero, actual: %v", ps.ExitCode) } + if err := executor.Exit(); err != nil { + t.Fatalf("error: %v", err) + } } func TestExecutor_Start_Wait(t *testing.T) { @@ -98,6 +99,9 @@ func TestExecutor_Start_Wait(t *testing.T) { if err != nil { t.Fatalf("error in waiting for command: %v", err) } + if err := executor.Exit(); err != nil { + t.Fatalf("error: %v", err) + } expected := "hello world" file := filepath.Join(ctx.AllocDir.LogDir(), "web.stdout.0") @@ -119,9 +123,9 @@ func TestExecutor_IsolationAndConstraints(t *testing.T) { ctx := testExecutorContext(t) defer ctx.AllocDir.Destroy() - ctx.FSIsolation = true - ctx.ResourceLimits = true - ctx.UnprivilegedUser = true + execCmd.FSIsolation = true + execCmd.ResourceLimits = true + execCmd.User = "nobody" executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags)) ps, err := executor.LaunchCmd(&execCmd, ctx) @@ -135,6 +139,9 @@ func TestExecutor_IsolationAndConstraints(t *testing.T) { if err != nil { t.Fatalf("error in waiting for command: %v", err) } + if err := executor.Exit(); err != nil { + t.Fatalf("error: %v", err) + } expected := "hello world" file := filepath.Join(ctx.AllocDir.LogDir(), "web.stdout.0") @@ -154,13 +161,13 @@ func TestExecutor_DestroyCgroup(t *testing.T) { execCmd := ExecCommand{Cmd: "/bin/bash", Args: []string{"-c", "/usr/bin/yes"}} ctx := testExecutorContext(t) - ctx.LogConfig.MaxFiles = 1 - ctx.LogConfig.MaxFileSizeMB = 300 + ctx.Task.LogConfig.MaxFiles = 1 + ctx.Task.LogConfig.MaxFileSizeMB = 300 defer ctx.AllocDir.Destroy() - ctx.FSIsolation = true - ctx.ResourceLimits = true - ctx.UnprivilegedUser = true + execCmd.FSIsolation = true + execCmd.ResourceLimits = true + execCmd.User = "nobody" executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags)) ps, err := executor.LaunchCmd(&execCmd, ctx) @@ -171,7 +178,10 @@ func TestExecutor_DestroyCgroup(t *testing.T) { t.Fatalf("expected process to start and have non zero pid") } time.Sleep(200 * time.Millisecond) - executor.Exit() + if err := executor.Exit(); err != nil { + t.Fatalf("err: %v", err) + } + file := filepath.Join(ctx.AllocDir.LogDir(), "web.stdout.0") finfo, err := os.Stat(file) if err != nil { @@ -203,6 +213,9 @@ func TestExecutor_Start_Kill(t *testing.T) { if err != nil { t.Fatalf("error in waiting for command: %v", err) } + if err := executor.Exit(); err != nil { + t.Fatalf("error: %v", err) + } file := filepath.Join(ctx.AllocDir.LogDir(), "web.stdout.0") time.Sleep(time.Duration(tu.TestMultiplier()*2) * time.Second) diff --git a/client/driver/executor/executor_windows.go b/client/driver/executor/executor_windows.go new file mode 100644 index 000000000..e93f936e7 --- /dev/null +++ b/client/driver/executor/executor_windows.go @@ -0,0 +1,5 @@ +package executor + +func (e *UniversalExecutor) LaunchSyslogServer(ctx *ExecutorContext) (*SyslogServerState, error) { + return nil, nil +} diff --git a/client/driver/executor_plugin.go b/client/driver/executor_plugin.go index 1189c19dd..9e6fcda64 100644 --- a/client/driver/executor_plugin.go +++ b/client/driver/executor_plugin.go @@ -1,6 +1,7 @@ package driver import ( + "encoding/gob" "log" "net/rpc" @@ -9,6 +10,14 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +// Registering these types since we have to serialize and de-serialize the Task +// structs over the wire between drivers and the executor. +func init() { + gob.Register([]interface{}{}) + gob.Register(map[string]interface{}{}) + gob.Register([]map[string]string{}) +} + type ExecutorRPC struct { client *rpc.Client } @@ -19,12 +28,23 @@ type LaunchCmdArgs struct { Ctx *executor.ExecutorContext } +// LaunchSyslogServerArgs wraps the executor context for the purposes of RPC +type LaunchSyslogServerArgs struct { + Ctx *executor.ExecutorContext +} + func (e *ExecutorRPC) LaunchCmd(cmd *executor.ExecCommand, ctx *executor.ExecutorContext) (*executor.ProcessState, error) { var ps *executor.ProcessState err := e.client.Call("Plugin.LaunchCmd", LaunchCmdArgs{Cmd: cmd, Ctx: ctx}, &ps) return ps, err } +func (e *ExecutorRPC) LaunchSyslogServer(ctx *executor.ExecutorContext) (*executor.SyslogServerState, error) { + var ss *executor.SyslogServerState + err := e.client.Call("Plugin.LaunchSyslogServer", LaunchSyslogServerArgs{Ctx: ctx}, &ss) + return ss, err +} + func (e *ExecutorRPC) Wait() (*executor.ProcessState, error) { var ps executor.ProcessState err := e.client.Call("Plugin.Wait", new(interface{}), &ps) @@ -43,6 +63,10 @@ func (e *ExecutorRPC) UpdateLogConfig(logConfig *structs.LogConfig) error { return e.client.Call("Plugin.UpdateLogConfig", logConfig, new(interface{})) } +func (e *ExecutorRPC) UpdateTask(task *structs.Task) error { + return e.client.Call("Plugin.UpdateTask", task, new(interface{})) +} + type ExecutorRPCServer struct { Impl executor.Executor } @@ -55,6 +79,14 @@ func (e *ExecutorRPCServer) LaunchCmd(args LaunchCmdArgs, ps *executor.ProcessSt return err } +func (e *ExecutorRPCServer) LaunchSyslogServer(args LaunchSyslogServerArgs, ss *executor.SyslogServerState) error { + state, err := e.Impl.LaunchSyslogServer(args.Ctx) + if state != nil { + *ss = *state + } + return err +} + func (e *ExecutorRPCServer) Wait(args interface{}, ps *executor.ProcessState) error { state, err := e.Impl.Wait() if state != nil { @@ -75,6 +107,10 @@ func (e *ExecutorRPCServer) UpdateLogConfig(args *structs.LogConfig, resp *inter return e.Impl.UpdateLogConfig(args) } +func (e *ExecutorRPCServer) UpdateTask(args *structs.Task, resp *interface{}) error { + return e.Impl.UpdateTask(args) +} + type ExecutorPlugin struct { logger *log.Logger Impl *ExecutorRPCServer diff --git a/client/driver/java.go b/client/driver/java.go index c8849f951..6d72eea4d 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -154,14 +154,9 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, err } executorCtx := &executor.ExecutorContext{ - TaskEnv: d.taskEnv, - AllocDir: ctx.AllocDir, - TaskName: task.Name, - TaskResources: task.Resources, - LogConfig: task.LogConfig, - FSIsolation: true, - UnprivilegedUser: true, - ResourceLimits: true, + TaskEnv: d.taskEnv, + AllocDir: ctx.AllocDir, + Task: task, } absPath, err := GetAbsolutePath("java") @@ -169,7 +164,13 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, err } - ps, err := execIntf.LaunchCmd(&executor.ExecCommand{Cmd: absPath, Args: args}, executorCtx) + ps, err := execIntf.LaunchCmd(&executor.ExecCommand{ + Cmd: absPath, + Args: args, + FSIsolation: true, + ResourceLimits: true, + User: cstructs.DefaultUnpriviledgedUser, + }, executorCtx) if err != nil { pluginClient.Kill() return nil, fmt.Errorf("error starting process via the plugin: %v", err) @@ -290,7 +291,7 @@ func (h *javaHandle) WaitCh() chan *cstructs.WaitResult { func (h *javaHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout) - h.executor.UpdateLogConfig(task.LogConfig) + h.executor.UpdateTask(task) // Update is not possible return nil diff --git a/client/driver/logging/syslog_server_windows.go b/client/driver/logging/syslog_server_windows.go new file mode 100644 index 000000000..cc6a60840 --- /dev/null +++ b/client/driver/logging/syslog_server_windows.go @@ -0,0 +1,10 @@ +package logging + +type SyslogServer struct { +} + +func (s *SyslogServer) Shutdown() { +} + +type SyslogMessage struct { +} diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 1c8d89cf5..ba9b77397 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -191,11 +191,9 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, err } executorCtx := &executor.ExecutorContext{ - TaskEnv: d.taskEnv, - AllocDir: ctx.AllocDir, - TaskName: task.Name, - TaskResources: task.Resources, - LogConfig: task.LogConfig, + TaskEnv: d.taskEnv, + AllocDir: ctx.AllocDir, + Task: task, } ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: args[0], Args: args[1:]}, executorCtx) if err != nil { @@ -292,7 +290,7 @@ func (h *qemuHandle) WaitCh() chan *cstructs.WaitResult { func (h *qemuHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout) - h.executor.UpdateLogConfig(task.LogConfig) + h.executor.UpdateTask(task) // Update is not possible return nil diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 3b1c87372..db7635959 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -96,11 +96,9 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl return nil, err } executorCtx := &executor.ExecutorContext{ - TaskEnv: d.taskEnv, - AllocDir: ctx.AllocDir, - TaskName: task.Name, - TaskResources: task.Resources, - LogConfig: task.LogConfig, + TaskEnv: d.taskEnv, + AllocDir: ctx.AllocDir, + Task: task, } ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: command, Args: driverConfig.Args}, executorCtx) if err != nil { @@ -195,7 +193,7 @@ func (h *rawExecHandle) WaitCh() chan *cstructs.WaitResult { func (h *rawExecHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout) - h.executor.UpdateLogConfig(task.LogConfig) + h.executor.UpdateTask(task) // Update is not possible return nil diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 3dea50b04..ab7eb6472 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -233,12 +233,9 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e return nil, err } executorCtx := &executor.ExecutorContext{ - TaskEnv: d.taskEnv, - AllocDir: ctx.AllocDir, - TaskName: task.Name, - TaskResources: task.Resources, - UnprivilegedUser: false, - LogConfig: task.LogConfig, + TaskEnv: d.taskEnv, + AllocDir: ctx.AllocDir, + Task: task, } absPath, err := GetAbsolutePath("rkt") @@ -329,7 +326,7 @@ func (h *rktHandle) WaitCh() chan *cstructs.WaitResult { func (h *rktHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout) - h.executor.UpdateLogConfig(task.LogConfig) + h.executor.UpdateTask(task) // Update is not possible return nil diff --git a/client/driver/structs/structs.go b/client/driver/structs/structs.go index cafd25c26..e304fd679 100644 --- a/client/driver/structs/structs.go +++ b/client/driver/structs/structs.go @@ -6,6 +6,10 @@ import ( cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" ) +const ( + DefaultUnpriviledgedUser = "nobody" +) + // WaitResult stores the result of a Wait operation. type WaitResult struct { ExitCode int