diff --git a/.travis.yml b/.travis.yml index 59038bceb..782976944 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,9 +9,6 @@ language: go go: - 1.6 -env: - - DOCKER_VERSION=1.9.1 - matrix: allow_failures: - go: tip @@ -21,7 +18,7 @@ branches: - master install: - - make prepare_docker + - make travis - make bootstrap script: diff --git a/GNUmakefile b/GNUmakefile index 769a74c02..26fadfc83 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -72,13 +72,8 @@ bootstrap: go get $$tool; \ done -prepare_docker: - sudo stop docker - sudo rm -rf /var/lib/docker - sudo rm -f `which docker` - sudo apt-key adv --keyserver hkp://p80.pool.sks-keyservers.net:80 --recv-keys 58118E89F3A912897C070ADBF76221572C52609D - echo "deb https://apt.dockerproject.org/repo ubuntu-trusty main" | sudo tee /etc/apt/sources.list.d/docker.list - sudo apt-get update - sudo apt-get install docker-engine=$(DOCKER_VERSION)-0~$(shell lsb_release -cs) -y --force-yes +travis: + @sh -c "'$(PWD)/scripts/update_docker.sh'" + @sh -c "'$(PWD)/scripts/install_rkt.sh'" .PHONY: all bin cov integ test vet web web-push test-nodep diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 2b0f6218a..574fc74f2 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "log" - "os" "os/exec" "path/filepath" "regexp" @@ -14,11 +13,14 @@ import ( "syscall" "time" + "github.com/hashicorp/go-plugin" "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/driver/executor" cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/client/fingerprint" + "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/nomad/structs" "github.com/mitchellh/mapstructure" ) @@ -53,20 +55,23 @@ type RktDriverConfig struct { // rktHandle is returned from Start/Open as a handle to the PID type rktHandle struct { - proc *os.Process - image string - logger *log.Logger - killTimeout time.Duration - waitCh chan *cstructs.WaitResult - doneCh chan struct{} + pluginClient *plugin.Client + executorPid int + executor executor.Executor + allocDir *allocdir.AllocDir + logger *log.Logger + killTimeout time.Duration + waitCh chan *cstructs.WaitResult + doneCh chan struct{} } // rktPID is a struct to map the pid running the process to the vm image on // disk type rktPID struct { - Pid int - Image string - KillTimeout time.Duration + PluginConfig *PluginReattachConfig + AllocDir *allocdir.AllocDir + ExecutorPid int + KillTimeout time.Duration } // NewRktDriver is used to create a new exec driver @@ -125,16 +130,16 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e if !ok { return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) } - taskLocal := filepath.Join(taskDir, allocdir.TaskLocal) // Build the command. var cmdArgs []string // Add the given trust prefix trustPrefix, trustCmd := task.Config["trust_prefix"] + insecure := false if trustCmd { var outBuf, errBuf bytes.Buffer - cmd := exec.Command("rkt", "trust", fmt.Sprintf("--prefix=%s", trustPrefix)) + cmd := exec.Command("rkt", "trust", "--skip-fingerprint-review=true", fmt.Sprintf("--prefix=%s", trustPrefix)) cmd.Stdout = &outBuf cmd.Stderr = &errBuf if err := cmd.Run(); err != nil { @@ -144,26 +149,22 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e d.logger.Printf("[DEBUG] driver.rkt: added trust prefix: %q", trustPrefix) } else { // Disble signature verification if the trust command was not run. + insecure = true + } + + cmdArgs = append(cmdArgs, "run", "--interactive") + cmdArgs = append(cmdArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", task.Name, ctx.AllocDir.SharedDir)) + cmdArgs = append(cmdArgs, fmt.Sprintf("--mount=volume=%s,target=%s", task.Name, ctx.AllocDir.SharedDir)) + cmdArgs = append(cmdArgs, img) + if insecure == true { cmdArgs = append(cmdArgs, "--insecure-options=all") } - d.taskEnv.SetAllocDir(filepath.Join("/", allocdir.SharedAllocName)). - SetTaskLocalDir(filepath.Join("/", allocdir.TaskLocal)).Build() - + // Inject enviornment variables for k, v := range d.taskEnv.EnvMap() { cmdArgs = append(cmdArgs, fmt.Sprintf("--set-env=%v=%v", k, v)) } - // Append the run command. - cmdArgs = append(cmdArgs, "run", "--mds-register=false", img) - - // Mount allc and task dirs - local, ok := ctx.AllocDir.TaskDirs[task.Name] - if !ok { - return nil, fmt.Errorf("Failed to find task local directory: %v", task.Name) - } - cmdArgs = append(cmdArgs, fmt.Sprintf("--volume %s,kind=empty,readOnly=false,source=%s --mount volume=data,target=%s", task.Name, local, ctx.AllocDir.SharedDir)) - // Check if the user has overriden the exec command. if execCmd, ok := task.Config["command"]; ok { cmdArgs = append(cmdArgs, fmt.Sprintf("--exec=%v", execCmd)) @@ -196,36 +197,45 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e } } - // Create files to capture stdin and out. - stdoutFilename := filepath.Join(taskLocal, fmt.Sprintf("%s.stdout", taskName)) - stderrFilename := filepath.Join(taskLocal, fmt.Sprintf("%s.stderr", taskName)) - - stdo, err := os.OpenFile(stdoutFilename, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + bin, err := discover.NomadExecutable() if err != nil { - return nil, fmt.Errorf("Error opening file to redirect stdout: %v", err) + return nil, fmt.Errorf("unable to find the nomad binary: %v", err) } - stde, err := os.OpenFile(stderrFilename, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name)) + pluginConfig := &plugin.ClientConfig{ + Cmd: exec.Command(bin, "executor", pluginLogFile), + } + + exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) if err != nil { - return nil, fmt.Errorf("Error opening file to redirect stderr: %v", err) + return nil, err + } + executorCtx := &executor.ExecutorContext{ + TaskEnv: d.taskEnv, + AllocDir: ctx.AllocDir, + TaskName: task.Name, + TaskResources: task.Resources, + UnprivilegedUser: false, + LogConfig: task.LogConfig, } - cmd := exec.Command("rkt", cmdArgs...) - cmd.Stdout = stdo - cmd.Stderr = stde - - if err := cmd.Start(); err != nil { - return nil, fmt.Errorf("Error running rkt: %v", err) + ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: "rkt", Args: cmdArgs}, executorCtx) + if err != nil { + pluginClient.Kill() + return nil, fmt.Errorf("error starting process via the plugin: %v", err) } - d.logger.Printf("[DEBUG] driver.rkt: started ACI %q with: %v", img, cmd.Args) + d.logger.Printf("[DEBUG] driver.rkt: started ACI %q with: %v", img, cmdArgs) h := &rktHandle{ - proc: cmd.Process, - image: img, - logger: d.logger, - killTimeout: d.DriverContext.KillTimeout(task), - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + pluginClient: pluginClient, + executor: exec, + executorPid: ps.Pid, + allocDir: ctx.AllocDir, + logger: d.logger, + killTimeout: d.DriverContext.KillTimeout(task), + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -239,20 +249,28 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error return nil, fmt.Errorf("failed to parse Rkt handle '%s': %v", handleID, err) } - // Find the process - proc, err := os.FindProcess(qpid.Pid) - if proc == nil || err != nil { - return nil, fmt.Errorf("failed to find Rkt PID %d: %v", qpid.Pid, err) + pluginConfig := &plugin.ClientConfig{ + Reattach: qpid.PluginConfig.PluginConfig(), + } + executor, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) + if err != nil { + d.logger.Println("[ERROR] driver.rkt: error connecting to plugin so destroying plugin pid and user pid") + if e := destroyPlugin(qpid.PluginConfig.Pid, qpid.ExecutorPid); e != nil { + d.logger.Printf("[ERROR] driver.rkt: error destroying plugin and executor pid: %v", e) + } + return nil, fmt.Errorf("error connecting to plugin: %v", err) } // Return a driver handle h := &rktHandle{ - proc: proc, - image: qpid.Image, - logger: d.logger, - killTimeout: qpid.KillTimeout, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + pluginClient: pluginClient, + executorPid: qpid.ExecutorPid, + allocDir: qpid.AllocDir, + executor: executor, + logger: d.logger, + killTimeout: qpid.KillTimeout, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() @@ -262,9 +280,10 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error func (h *rktHandle) ID() string { // Return a handle to the PID pid := &rktPID{ - Pid: h.proc.Pid, - Image: h.image, - KillTimeout: h.killTimeout, + PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), + KillTimeout: h.killTimeout, + ExecutorPid: h.executorPid, + AllocDir: h.allocDir, } data, err := json.Marshal(pid) if err != nil { @@ -280,6 +299,7 @@ func (h *rktHandle) WaitCh() chan *cstructs.WaitResult { func (h *rktHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = task.KillTimeout + h.executor.UpdateLogConfig(task.LogConfig) // Update is not possible return nil @@ -288,23 +308,27 @@ func (h *rktHandle) Update(task *structs.Task) error { // Kill is used to terminate the task. We send an Interrupt // and then provide a 5 second grace period before doing a Kill. func (h *rktHandle) Kill() error { - h.proc.Signal(os.Interrupt) + h.executor.ShutDown() select { case <-h.doneCh: return nil case <-time.After(h.killTimeout): - return h.proc.Kill() + return h.executor.Exit() } } func (h *rktHandle) run() { - ps, err := h.proc.Wait() + ps, err := h.executor.Wait() close(h.doneCh) - code := 0 - if !ps.Success() { - // TODO: Better exit code parsing. - code = 1 + if ps.ExitCode == 0 && err != nil { + if e := killProcess(h.executorPid); e != nil { + h.logger.Printf("[ERROR] driver.rkt: error killing user process: %v", e) + } + if e := h.allocDir.UnmountAll(); e != nil { + h.logger.Printf("[ERROR] driver.rkt: unmounting dev,proc and alloc dirs failed: %v", e) + } } - h.waitCh <- cstructs.NewWaitResult(code, 0, err) + h.waitCh <- cstructs.NewWaitResult(ps.ExitCode, 0, err) close(h.waitCh) + h.pluginClient.Kill() } diff --git a/client/driver/rkt_test.go b/client/driver/rkt_test.go index f3ee857fe..c2bbd600c 100644 --- a/client/driver/rkt_test.go +++ b/client/driver/rkt_test.go @@ -3,17 +3,16 @@ package driver import ( "fmt" "io/ioutil" - "os" "path/filepath" + "reflect" "testing" "time" - "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/driver/env" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" - cstructs "github.com/hashicorp/nomad/client/driver/structs" ctestutils "github.com/hashicorp/nomad/client/testutil" ) @@ -32,22 +31,6 @@ func TestRktVersionRegex(t *testing.T) { } } -func TestRktDriver_Handle(t *testing.T) { - h := &rktHandle{ - proc: &os.Process{Pid: 123}, - image: "foo", - killTimeout: 5 * time.Nanosecond, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), - } - - actual := h.ID() - expected := `Rkt:{"Pid":123,"Image":"foo","KillTimeout":5}` - if actual != expected { - t.Errorf("Expected `%s`, found `%s`", expected, actual) - } -} - // The fingerprinter test should always pass, even if rkt is not installed. func TestRktDriver_Fingerprint(t *testing.T) { ctestutils.RktCompatible(t) @@ -84,14 +67,19 @@ func TestRktDriver_Start(t *testing.T) { "image": "coreos.com/etcd:v2.0.4", "command": "/etcd", }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: &structs.Resources{ - MemoryMB: 256, - CPU: 512, + MemoryMB: 128, + CPU: 100, }, } driverCtx, execCtx := testDriverContexts(task) defer execCtx.AllocDir.Destroy() + d := NewRktDriver(driverCtx) handle, err := d.Start(execCtx, task) @@ -101,6 +89,7 @@ func TestRktDriver_Start(t *testing.T) { if handle == nil { t.Fatalf("missing handle") } + defer handle.Kill() // Attempt to open handle2, err := d.Open(execCtx, handle.ID()) @@ -110,11 +99,6 @@ func TestRktDriver_Start(t *testing.T) { if handle2 == nil { t.Fatalf("missing handle") } - - // Clean up - if err := handle.Kill(); err != nil { - fmt.Printf("\nError killing Rkt test: %s", err) - } } func TestRktDriver_Start_Wait(t *testing.T) { @@ -127,9 +111,13 @@ func TestRktDriver_Start_Wait(t *testing.T) { "command": "/etcd", "args": []string{"--version"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: &structs.Resources{ - MemoryMB: 256, - CPU: 512, + MemoryMB: 128, + CPU: 100, }, } @@ -171,9 +159,13 @@ func TestRktDriver_Start_Wait_Skip_Trust(t *testing.T) { "command": "/etcd", "args": []string{"--version"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: &structs.Resources{ - MemoryMB: 256, - CPU: 512, + MemoryMB: 128, + CPU: 100, }, } @@ -206,19 +198,29 @@ func TestRktDriver_Start_Wait_Skip_Trust(t *testing.T) { } } -func TestRktDriver_Start_Wait_Logs(t *testing.T) { +func TestRktDriver_Start_Wait_AllocDir(t *testing.T) { ctestutils.RktCompatible(t) + + exp := []byte{'w', 'i', 'n'} + file := "output.txt" + task := &structs.Task{ - Name: "etcd", + Name: "alpine", Config: map[string]interface{}{ - "trust_prefix": "coreos.com/etcd", - "image": "coreos.com/etcd:v2.0.4", - "command": "/etcd", - "args": []string{"--version"}, + "image": "docker://alpine", + "command": "/bin/sh", + "args": []string{ + "-c", + fmt.Sprintf(`echo -n %s > ${%s}/%s`, string(exp), env.AllocDir, file), + }, + }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, }, Resources: &structs.Resources{ - MemoryMB: 256, - CPU: 512, + MemoryMB: 128, + CPU: 100, }, } @@ -244,17 +246,14 @@ func TestRktDriver_Start_Wait_Logs(t *testing.T) { t.Fatalf("timeout") } - taskDir, ok := execCtx.AllocDir.TaskDirs[task.Name] - if !ok { - t.Fatalf("Could not find task directory for task: %v", task) - } - stdout := filepath.Join(taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stdout", task.Name)) - data, err := ioutil.ReadFile(stdout) - if err != nil { - t.Fatalf("Failed to read tasks stdout: %v", err) - } + // Check that data was written to the shared alloc directory. + outputFile := filepath.Join(execCtx.AllocDir.SharedDir, file) + act, err := ioutil.ReadFile(outputFile) + if err != nil { + t.Fatalf("Couldn't read expected output: %v", err) + } - if len(data) == 0 { - t.Fatal("Task's stdout is empty") - } + if !reflect.DeepEqual(act, exp) { + t.Fatalf("Command output is %v; expected %v", act, exp) + } } diff --git a/scripts/install_rkt.sh b/scripts/install_rkt.sh new file mode 100755 index 000000000..d3987e480 --- /dev/null +++ b/scripts/install_rkt.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +set -ex + +RKT_VERSION="v1.0.0" +DEST_DIR="/usr/local/bin" + +sudo mkdir -p /etc/rkt/net.d +echo '{"name": "default", "type": "ptp", "ipMasq": false, "ipam": { "type": "host-local", "subnet": "172.16.28.0/24", "routes": [ { "dst": "0.0.0.0/0" } ] } }' | sudo tee -a /etc/rkt/net.d/99-network.conf + +wget https://github.com/coreos/rkt/releases/download/$RKT_VERSION/rkt-$RKT_VERSION.tar.gz +tar xzvf rkt-$RKT_VERSION.tar.gz +sudo cp rkt-$RKT_VERSION/rkt $DEST_DIR +sudo cp rkt-$RKT_VERSION/*.aci $DEST_DIR + +rkt version diff --git a/scripts/update_docker.sh b/scripts/update_docker.sh new file mode 100755 index 000000000..e1cc66176 --- /dev/null +++ b/scripts/update_docker.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +set -ex + +DOCKER_VERSION="1.9.1" + +sudo stop docker +sudo rm -rf /var/lib/docker +sudo rm -f `which docker` +sudo apt-key adv --keyserver hkp://p80.pool.sks-keyservers.net:80 --recv-keys 58118E89F3A912897C070ADBF76221572C52609D +echo "deb https://apt.dockerproject.org/repo ubuntu-trusty main" | sudo tee /etc/apt/sources.list.d/docker.list +sudo apt-get update +sudo apt-get install docker-engine=$DOCKER_VERSION-0~`lsb_release -cs` -y --force-yes + +docker version