diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 000000000..6bf7bd245 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,21 @@ +sudo: false + +language: go + +go: + - 1.5.1 + - tip + +matrix: + allow_failures: + - go: tip + +branches: + only: + - master + +install: + - make bootstrap + +script: + - make test diff --git a/CHANGELOG.md b/CHANGELOG.md index 74570c350..786380eef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ IMPROVEMENTS: * Nomad client cleans allocations on exit when in dev mode + * drivers: Use go-getter for artifact retrieval, add artifact support to Exec, Raw Exec drivers [GH-288] ## 0.1.1 (October 5, 2015) diff --git a/api/agent.go b/api/agent.go index 7633a4538..32c0c87d5 100644 --- a/api/agent.go +++ b/api/agent.go @@ -178,3 +178,22 @@ type AgentMember struct { DelegateMax uint8 DelegateCur uint8 } + +// AgentMembersNameSort implements sort.Interface for []*AgentMembersNameSort +// based on the Name, DC and Region +type AgentMembersNameSort []*AgentMember + +func (a AgentMembersNameSort) Len() int { return len(a) } +func (a AgentMembersNameSort) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a AgentMembersNameSort) Less(i, j int) bool { + if a[i].Tags["region"] != a[j].Tags["region"] { + return a[i].Tags["region"] < a[j].Tags["region"] + } + + if a[i].Tags["dc"] != a[j].Tags["dc"] { + return a[i].Tags["dc"] < a[j].Tags["dc"] + } + + return a[i].Name < a[j].Name + +} diff --git a/api/agent_test.go b/api/agent_test.go index adb55640c..d83ff2cf2 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -1,6 +1,8 @@ package api import ( + "reflect" + "sort" "testing" "github.com/hashicorp/nomad/testutil" @@ -154,3 +156,117 @@ func TestAgent_SetServers(t *testing.T) { t.Fatalf("bad server list: %v", out) } } + +func (a *AgentMember) String() string { + return "{Name: " + a.Name + " Region: " + a.Tags["region"] + " DC: " + a.Tags["dc"] + "}" +} + +func TestAgents_Sort(t *testing.T) { + var sortTests = []struct { + in []*AgentMember + out []*AgentMember + }{ + { + []*AgentMember{ + &AgentMember{Name: "nomad-2.vac.us-east", + Tags: map[string]string{"region": "us-east", "dc": "us-east-1c"}}, + &AgentMember{Name: "nomad-1.global", + Tags: map[string]string{"region": "global", "dc": "dc1"}}, + &AgentMember{Name: "nomad-1.vac.us-east", + Tags: map[string]string{"region": "us-east", "dc": "us-east-1c"}}, + }, + []*AgentMember{ + &AgentMember{Name: "nomad-1.global", + Tags: map[string]string{"region": "global", "dc": "dc1"}}, + &AgentMember{Name: "nomad-1.vac.us-east", + Tags: map[string]string{"region": "us-east", "dc": "us-east-1c"}}, + &AgentMember{Name: "nomad-2.vac.us-east", + Tags: map[string]string{"region": "us-east", "dc": "us-east-1c"}}, + }, + }, + { + []*AgentMember{ + &AgentMember{Name: "nomad-02.tam.us-east", + Tags: map[string]string{"region": "us-east", "dc": "tampa"}}, + &AgentMember{Name: "nomad-02.pal.us-west", + Tags: map[string]string{"region": "us-west", "dc": "palo_alto"}}, + &AgentMember{Name: "nomad-01.pal.us-west", + Tags: map[string]string{"region": "us-west", "dc": "palo_alto"}}, + &AgentMember{Name: "nomad-01.tam.us-east", + Tags: map[string]string{"region": "us-east", "dc": "tampa"}}, + }, + []*AgentMember{ + &AgentMember{Name: "nomad-01.tam.us-east", + Tags: map[string]string{"region": "us-east", "dc": "tampa"}}, + &AgentMember{Name: "nomad-02.tam.us-east", + Tags: map[string]string{"region": "us-east", "dc": "tampa"}}, + &AgentMember{Name: "nomad-01.pal.us-west", + Tags: map[string]string{"region": "us-west", "dc": "palo_alto"}}, + &AgentMember{Name: "nomad-02.pal.us-west", + Tags: map[string]string{"region": "us-west", "dc": "palo_alto"}}, + }, + }, + { + []*AgentMember{ + &AgentMember{Name: "nomad-02.tam.us-east", + Tags: map[string]string{"region": "us-east", "dc": "tampa"}}, + &AgentMember{Name: "nomad-02.ams.europe", + Tags: map[string]string{"region": "europe", "dc": "amsterdam"}}, + &AgentMember{Name: "nomad-01.tam.us-east", + Tags: map[string]string{"region": "us-east", "dc": "tampa"}}, + &AgentMember{Name: "nomad-01.ams.europe", + Tags: map[string]string{"region": "europe", "dc": "amsterdam"}}, + }, + []*AgentMember{ + &AgentMember{Name: "nomad-01.ams.europe", + Tags: map[string]string{"region": "europe", "dc": "amsterdam"}}, + &AgentMember{Name: "nomad-02.ams.europe", + Tags: map[string]string{"region": "europe", "dc": "amsterdam"}}, + &AgentMember{Name: "nomad-01.tam.us-east", + Tags: map[string]string{"region": "us-east", "dc": "tampa"}}, + &AgentMember{Name: "nomad-02.tam.us-east", + Tags: map[string]string{"region": "us-east", "dc": "tampa"}}, + }, + }, + { + []*AgentMember{ + &AgentMember{Name: "nomad-02.ber.europe", + Tags: map[string]string{"region": "europe", "dc": "berlin"}}, + &AgentMember{Name: "nomad-02.ams.europe", + Tags: map[string]string{"region": "europe", "dc": "amsterdam"}}, + &AgentMember{Name: "nomad-01.ams.europe", + Tags: map[string]string{"region": "europe", "dc": "amsterdam"}}, + &AgentMember{Name: "nomad-01.ber.europe", + Tags: map[string]string{"region": "europe", "dc": "berlin"}}, + }, + []*AgentMember{ + &AgentMember{Name: "nomad-01.ams.europe", + Tags: map[string]string{"region": "europe", "dc": "amsterdam"}}, + &AgentMember{Name: "nomad-02.ams.europe", + Tags: map[string]string{"region": "europe", "dc": "amsterdam"}}, + &AgentMember{Name: "nomad-01.ber.europe", + Tags: map[string]string{"region": "europe", "dc": "berlin"}}, + &AgentMember{Name: "nomad-02.ber.europe", + Tags: map[string]string{"region": "europe", "dc": "berlin"}}, + }, + }, + { + []*AgentMember{ + &AgentMember{Name: "nomad-1.global"}, + &AgentMember{Name: "nomad-3.global"}, + &AgentMember{Name: "nomad-2.global"}, + }, + []*AgentMember{ + &AgentMember{Name: "nomad-1.global"}, + &AgentMember{Name: "nomad-2.global"}, + &AgentMember{Name: "nomad-3.global"}, + }, + }, + } + for _, tt := range sortTests { + sort.Sort(AgentMembersNameSort(tt.in)) + if !reflect.DeepEqual(tt.in, tt.out) { + t.Errorf("\necpected: %s\nget : %s", tt.in, tt.out) + } + } +} diff --git a/api/api.go b/api/api.go index f37873c20..5ca63aad7 100644 --- a/api/api.go +++ b/api/api.go @@ -10,6 +10,8 @@ import ( "os" "strconv" "time" + + "github.com/hashicorp/go-cleanhttp" ) // QueryOptions are used to parameterize a query @@ -86,7 +88,7 @@ type Config struct { func DefaultConfig() *Config { config := &Config{ Address: "http://127.0.0.1:4646", - HttpClient: http.DefaultClient, + HttpClient: cleanhttp.DefaultClient(), } if addr := os.Getenv("NOMAD_ADDR"); addr != "" { config.Address = addr diff --git a/api/jobs.go b/api/jobs.go index f1cf189bf..17e75daff 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -2,6 +2,7 @@ package api import ( "sort" + "time" ) const ( @@ -100,6 +101,12 @@ func (j *Jobs) ForceEvaluate(jobID string, q *WriteOptions) (string, *WriteMeta, return resp.EvalID, wm, nil } +//UpdateStrategy is for serializing update strategy for a job. +type UpdateStrategy struct { + Stagger time.Duration + MaxParallel int +} + // Job is used to serialize a job. type Job struct { Region string @@ -111,6 +118,7 @@ type Job struct { Datacenters []string Constraints []*Constraint TaskGroups []*TaskGroup + Update *UpdateStrategy Meta map[string]string Status string StatusDescription string diff --git a/client/alloc_runner.go b/client/alloc_runner.go index f4613ad67..f41be4558 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -282,7 +282,11 @@ func (r *AllocRunner) Run() { // Create the execution context if r.ctx == nil { allocDir := allocdir.NewAllocDir(filepath.Join(r.config.AllocDir, r.alloc.ID)) - allocDir.Build(tg.Tasks) + if err := allocDir.Build(tg.Tasks); err != nil { + r.logger.Printf("[WARN] client: failed to build task directories: %v", err) + r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup)) + return + } r.ctx = driver.NewExecContext(allocDir) } diff --git a/client/driver/docker.go b/client/driver/docker.go index 2b4fdd78e..bbd52a9d8 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -4,12 +4,15 @@ import ( "encoding/json" "fmt" "log" + "path/filepath" "strconv" "strings" docker "github.com/fsouza/go-dockerclient" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/driver/args" "github.com/hashicorp/nomad/nomad/structs" ) @@ -37,11 +40,36 @@ func NewDockerDriver(ctx *DriverContext) Driver { return &DockerDriver{*ctx} } +// dockerClient creates *docker.Client. In test / dev mode we can use ENV vars +// to connect to the docker daemon. In production mode we will read +// docker.endpoint from the config file. +func (d *DockerDriver) dockerClient() (*docker.Client, error) { + // In dev mode, read DOCKER_* environment variables DOCKER_HOST, + // DOCKER_TLS_VERIFY, and DOCKER_CERT_PATH. This allows you to run tests and + // demo against boot2docker or a VM on OSX and Windows. This falls back on + // the default unix socket on linux if tests are run on linux. + // + // Also note that we need to turn on DevMode in the test configs. + if d.config.DevMode { + return docker.NewClientFromEnv() + } + + // In prod mode we'll read the docker.endpoint configuration and fall back + // on the host-specific default. We do not read from the environment. + defaultEndpoint, err := docker.DefaultDockerHost() + if err != nil { + return nil, fmt.Errorf("Unable to determine default docker endpoint: %s", err) + } + dockerEndpoint := d.config.ReadDefault("docker.endpoint", defaultEndpoint) + + return docker.NewClient(dockerEndpoint) +} + func (d *DockerDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, error) { // Initialize docker API client - dockerEndpoint := d.config.ReadDefault("docker.endpoint", "unix:///var/run/docker.sock") - client, err := docker.NewClient(dockerEndpoint) + client, err := d.dockerClient() if err != nil { + d.logger.Printf("[DEBUG] driver.docker: could not connect to docker daemon: %v", err) return false, nil } @@ -56,6 +84,7 @@ func (d *DockerDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool env, err := client.Version() if err != nil { + d.logger.Printf("[DEBUG] driver.docker: could not read version from daemon: %v", err) // Check the "no such file" error if the unix file is missing if strings.Contains(err.Error(), "no such file") { return false, nil @@ -65,18 +94,39 @@ func (d *DockerDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool // is broken. return false, err } - node.Attributes["driver.docker"] = "true" + node.Attributes["driver.docker"] = "1" node.Attributes["driver.docker.version"] = env.Get("Version") return true, nil } -// We have to call this when we create the container AND when we start it so -// we'll make a function. -func createHostConfig(task *structs.Task) *docker.HostConfig { - // hostConfig holds options for the docker container that are unique to this - // machine, such as resource limits and port mappings - return &docker.HostConfig{ +func (d *DockerDriver) containerBinds(alloc *allocdir.AllocDir, task *structs.Task) ([]string, error) { + shared := alloc.SharedDir + local, ok := alloc.TaskDirs[task.Name] + if !ok { + return nil, fmt.Errorf("Failed to find task local directory: %v", task.Name) + } + + return []string{ + fmt.Sprintf("%s:%s", shared, allocdir.SharedAllocName), + fmt.Sprintf("%s:%s", local, allocdir.TaskLocal), + }, nil +} + +// createContainer initializes a struct needed to call docker.client.CreateContainer() +func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task) (docker.CreateContainerOptions, error) { + var c docker.CreateContainerOptions + if task.Resources == nil { + d.logger.Printf("[ERR] driver.docker: task.Resources is empty") + return c, fmt.Errorf("task.Resources is nil and we can't constrain resource usage. We shouldn't have been able to schedule this in the first place.") + } + + binds, err := d.containerBinds(ctx.AllocDir, task) + if err != nil { + return c, err + } + + hostConfig := &docker.HostConfig{ // Convert MB to bytes. This is an absolute value. // // This value represents the total amount of memory a process can use. @@ -105,40 +155,38 @@ func createHostConfig(task *structs.Task) *docker.HostConfig { // - https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt // - https://www.kernel.org/doc/Documentation/scheduler/sched-design-CFS.txt CPUShares: int64(task.Resources.CPU), - } -} -// createContainer initializes a struct needed to call docker.client.CreateContainer() -func createContainer(ctx *ExecContext, task *structs.Task, logger *log.Logger) docker.CreateContainerOptions { - if task.Resources == nil { - panic("task.Resources is nil and we can't constrain resource usage. We shouldn't have been able to schedule this in the first place.") + // Binds are used to mount a host volume into the container. We mount a + // local directory for storage and a shared alloc directory that can be + // used to share data between different tasks in the same task group. + Binds: binds, } - hostConfig := createHostConfig(task) - logger.Printf("[DEBUG] driver.docker: using %d bytes memory for %s", hostConfig.Memory, task.Config["image"]) - logger.Printf("[DEBUG] driver.docker: using %d cpu shares for %s", hostConfig.CPUShares, task.Config["image"]) + d.logger.Printf("[DEBUG] driver.docker: using %d bytes memory for %s", hostConfig.Memory, task.Config["image"]) + d.logger.Printf("[DEBUG] driver.docker: using %d cpu shares for %s", hostConfig.CPUShares, task.Config["image"]) + d.logger.Printf("[DEBUG] driver.docker: binding directories %#v for %s", hostConfig.Binds, task.Config["image"]) mode, ok := task.Config["network_mode"] if !ok || mode == "" { // docker default - logger.Printf("[WARN] driver.docker: no mode specified for networking, defaulting to bridge") + d.logger.Printf("[WARN] driver.docker: no mode specified for networking, defaulting to bridge") mode = "bridge" } // Ignore the container mode for now switch mode { case "default", "bridge", "none", "host": - logger.Printf("[DEBUG] driver.docker: using %s as network mode", mode) + d.logger.Printf("[DEBUG] driver.docker: using %s as network mode", mode) default: - logger.Printf("[WARN] invalid setting for network mode %s, defaulting to bridge mode on docker0", mode) - mode = "bridge" + d.logger.Printf("[ERR] driver.docker: invalid setting for network mode: %s", mode) + return c, fmt.Errorf("Invalid setting for network mode: %s", mode) } hostConfig.NetworkMode = mode // Setup port mapping (equivalent to -p on docker CLI). Ports must already be // exposed in the container. if len(task.Resources.Networks) == 0 { - logger.Print("[WARN] driver.docker: No networks are available for port mapping") + d.logger.Print("[WARN] driver.docker: No networks are available for port mapping") } else { network := task.Resources.Networks[0] dockerPorts := map[docker.Port][]docker.PortBinding{} @@ -146,7 +194,7 @@ func createContainer(ctx *ExecContext, task *structs.Task, logger *log.Logger) d for _, port := range network.ListStaticPorts() { dockerPorts[docker.Port(strconv.Itoa(port)+"/tcp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: strconv.Itoa(port)}} dockerPorts[docker.Port(strconv.Itoa(port)+"/udp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: strconv.Itoa(port)}} - logger.Printf("[DEBUG] driver.docker: allocated port %s:%d -> %d (static)\n", network.IP, port, port) + d.logger.Printf("[DEBUG] driver.docker: allocated port %s:%d -> %d (static)\n", network.IP, port, port) } for label, port := range network.MapDynamicPorts() { @@ -160,30 +208,47 @@ func createContainer(ctx *ExecContext, task *structs.Task, logger *log.Logger) d if _, err := strconv.Atoi(label); err == nil { dockerPorts[docker.Port(label+"/tcp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: strconv.Itoa(port)}} dockerPorts[docker.Port(label+"/udp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: strconv.Itoa(port)}} - logger.Printf("[DEBUG] driver.docker: allocated port %s:%d -> %s (mapped)", network.IP, port, label) + d.logger.Printf("[DEBUG] driver.docker: allocated port %s:%d -> %s (mapped)", network.IP, port, label) } else { dockerPorts[docker.Port(strconv.Itoa(port)+"/tcp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: strconv.Itoa(port)}} dockerPorts[docker.Port(strconv.Itoa(port)+"/udp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: strconv.Itoa(port)}} - logger.Printf("[DEBUG] driver.docker: allocated port %s:%d -> %d for label %s\n", network.IP, port, port, label) + d.logger.Printf("[DEBUG] driver.docker: allocated port %s:%d -> %d for label %s\n", network.IP, port, port, label) } } hostConfig.PortBindings = dockerPorts } + // Create environment variables. + env := TaskEnvironmentVariables(ctx, task) + env.SetAllocDir(filepath.Join("/", allocdir.SharedAllocName)) + env.SetTaskLocalDir(filepath.Join("/", allocdir.TaskLocal)) + config := &docker.Config{ - Env: TaskEnvironmentVariables(ctx, task).List(), + Env: env.List(), Image: task.Config["image"], } + rawArgs, hasArgs := task.Config["args"] + parsedArgs, err := args.ParseAndReplace(rawArgs, env.Map()) + if err != nil { + return c, err + } + // If the user specified a custom command to run, we'll inject it here. if command, ok := task.Config["command"]; ok { - config.Cmd = strings.Split(command, " ") + cmd := []string{command} + if hasArgs { + cmd = append(cmd, parsedArgs...) + } + config.Cmd = cmd + } else if hasArgs { + d.logger.Println("[DEBUG] driver.docker: ignoring args because command not specified") } return docker.CreateContainerOptions{ Config: config, HostConfig: hostConfig, - } + }, nil } func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { @@ -212,10 +277,9 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle } // Initialize docker API client - dockerEndpoint := d.config.ReadDefault("docker.endpoint", "unix:///var/run/docker.sock") - client, err := docker.NewClient(dockerEndpoint) + client, err := d.dockerClient() if err != nil { - return nil, fmt.Errorf("Failed to connect to docker.endpoint (%s): %s", dockerEndpoint, err) + return nil, fmt.Errorf("Failed to connect to docker daemon: %s", err) } repo, tag := docker.ParseRepositoryTag(image) @@ -258,8 +322,13 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle d.logger.Printf("[DEBUG] driver.docker: using image %s", dockerImage.ID) d.logger.Printf("[INFO] driver.docker: identified image %s as %s", image, dockerImage.ID) + config, err := d.createContainer(ctx, task) + if err != nil { + d.logger.Printf("[ERR] driver.docker: %s", err) + return nil, fmt.Errorf("Failed to create container config for image %s", image) + } // Create a container - container, err := client.CreateContainer(createContainer(ctx, task, d.logger)) + container, err := client.CreateContainer(config) if err != nil { d.logger.Printf("[ERR] driver.docker: %s", err) return nil, fmt.Errorf("Failed to create container from image %s", image) @@ -309,10 +378,9 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er d.logger.Printf("[INFO] driver.docker: re-attaching to docker process: %s", handleID) // Initialize docker API client - dockerEndpoint := d.config.ReadDefault("docker.endpoint", "unix:///var/run/docker.sock") - client, err := docker.NewClient(dockerEndpoint) + client, err := d.dockerClient() if err != nil { - return nil, fmt.Errorf("Failed to connect to docker.endpoint (%s): %s", dockerEndpoint, err) + return nil, fmt.Errorf("Failed to connect to docker daemon: %s", err) } // Look for a running container with this ID @@ -401,6 +469,7 @@ func (h *dockerHandle) Kill() error { err = h.client.RemoveImage(h.imageID) if err != nil { containers, err := h.client.ListContainers(docker.ListContainersOptions{ + // The image might be in use by a stopped container, so check everything All: true, Filters: map[string][]string{ "image": []string{h.imageID}, diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index 6af8e28d1..53052bd7e 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -1,14 +1,25 @@ package driver import ( + "fmt" + "io/ioutil" "os/exec" + "path/filepath" + "reflect" "testing" "time" "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/driver/environment" "github.com/hashicorp/nomad/nomad/structs" ) +func testDockerDriverContext(task string) *DriverContext { + cfg := testConfig() + cfg.DevMode = true + return NewDriverContext(task, cfg, cfg.Node, testLogger()) +} + // dockerLocated looks to see whether docker is available on this system before // we try to run tests. We'll keep it simple and just check for the CLI. func dockerLocated() bool { @@ -33,7 +44,7 @@ func TestDockerDriver_Handle(t *testing.T) { // The fingerprinter test should always pass, even if Docker is not installed. func TestDockerDriver_Fingerprint(t *testing.T) { - d := NewDockerDriver(testDriverContext("")) + d := NewDockerDriver(testDockerDriverContext("")) node := &structs.Node{ Attributes: make(map[string]string), } @@ -44,7 +55,7 @@ func TestDockerDriver_Fingerprint(t *testing.T) { if apply != dockerLocated() { t.Fatalf("Fingerprinter should detect Docker when it is installed") } - if node.Attributes["driver.docker"] == "" { + if node.Attributes["driver.docker"] != "1" { t.Log("Docker not found. The remainder of the docker tests will be skipped.") } t.Logf("Found docker version %s", node.Attributes["driver.docker.version"]) @@ -56,14 +67,14 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) { } task := &structs.Task{ - Name: "python-demo", + Name: "redis-demo", Config: map[string]string{ "image": "redis", }, Resources: basicResources, } - driverCtx := testDriverContext(task.Name) + driverCtx := testDockerDriverContext(task.Name) ctx := testDriverExecContext(task, driverCtx) defer ctx.AllocDir.Destroy() d := NewDockerDriver(driverCtx) @@ -93,10 +104,11 @@ func TestDockerDriver_Start_Wait(t *testing.T) { } task := &structs.Task{ - Name: "python-demo", + Name: "redis-demo", Config: map[string]string{ "image": "redis", - "command": "redis-server -v", + "command": "redis-server", + "args": "-v", }, Resources: &structs.Resources{ MemoryMB: 256, @@ -104,7 +116,7 @@ func TestDockerDriver_Start_Wait(t *testing.T) { }, } - driverCtx := testDriverContext(task.Name) + driverCtx := testDockerDriverContext(task.Name) ctx := testDriverExecContext(task, driverCtx) defer ctx.AllocDir.Destroy() d := NewDockerDriver(driverCtx) @@ -134,21 +146,77 @@ func TestDockerDriver_Start_Wait(t *testing.T) { } } +func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) { + if !dockerLocated() { + t.SkipNow() + } + + exp := []byte{'w', 'i', 'n'} + file := "output.txt" + task := &structs.Task{ + Name: "redis-demo", + Config: map[string]string{ + "image": "redis", + "command": "/bin/bash", + "args": fmt.Sprintf(`-c "sleep 1; echo -n %s > $%s/%s"`, string(exp), environment.AllocDir, file), + }, + Resources: &structs.Resources{ + MemoryMB: 256, + CPU: 512, + }, + } + + driverCtx := testDockerDriverContext(task.Name) + ctx := testDriverExecContext(task, driverCtx) + defer ctx.AllocDir.Destroy() + d := NewDockerDriver(driverCtx) + + handle, err := d.Start(ctx, task) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + defer handle.Kill() + + select { + case err := <-handle.WaitCh(): + if err != nil { + t.Fatalf("err: %v", err) + } + case <-time.After(5 * time.Second): + t.Fatalf("timeout") + } + + // Check that data was written to the shared alloc directory. + outputFile := filepath.Join(ctx.AllocDir.SharedDir, file) + act, err := ioutil.ReadFile(outputFile) + if err != nil { + t.Fatalf("Couldn't read expected output: %v", err) + } + + if !reflect.DeepEqual(act, exp) { + t.Fatalf("Command outputted %v; want %v", act, exp) + } +} + func TestDockerDriver_Start_Kill_Wait(t *testing.T) { if !dockerLocated() { t.SkipNow() } task := &structs.Task{ - Name: "python-demo", + Name: "redis-demo", Config: map[string]string{ "image": "redis", - "command": "sleep 10", + "command": "/bin/sleep", + "args": "10", }, Resources: basicResources, } - driverCtx := testDriverContext(task.Name) + driverCtx := testDockerDriverContext(task.Name) ctx := testDriverExecContext(task, driverCtx) defer ctx.AllocDir.Destroy() d := NewDockerDriver(driverCtx) @@ -182,6 +250,7 @@ func TestDockerDriver_Start_Kill_Wait(t *testing.T) { func taskTemplate() *structs.Task { return &structs.Task{ + Name: "redis-demo", Config: map[string]string{ "image": "redis", }, @@ -222,7 +291,7 @@ func TestDocker_StartN(t *testing.T) { // Let's spin up a bunch of things var err error for idx, task := range taskList { - driverCtx := testDriverContext(task.Name) + driverCtx := testDockerDriverContext(task.Name) ctx := testDriverExecContext(task, driverCtx) defer ctx.AllocDir.Destroy() d := NewDockerDriver(driverCtx) @@ -236,6 +305,11 @@ func TestDocker_StartN(t *testing.T) { t.Log("==> All tasks are started. Terminating...") for idx, handle := range handles { + if handle == nil { + t.Errorf("Bad handle for task #%d", idx+1) + continue + } + err := handle.Kill() if err != nil { t.Errorf("Failed stopping task #%d: %s", idx+1, err) @@ -271,7 +345,7 @@ func TestDocker_StartNVersions(t *testing.T) { // Let's spin up a bunch of things var err error for idx, task := range taskList { - driverCtx := testDriverContext(task.Name) + driverCtx := testDockerDriverContext(task.Name) ctx := testDriverExecContext(task, driverCtx) defer ctx.AllocDir.Destroy() d := NewDockerDriver(driverCtx) @@ -285,6 +359,11 @@ func TestDocker_StartNVersions(t *testing.T) { t.Log("==> All tasks are started. Terminating...") for idx, handle := range handles { + if handle == nil { + t.Errorf("Bad handle for task #%d", idx+1) + continue + } + err := handle.Kill() if err != nil { t.Errorf("Failed stopping task #%d: %s", idx+1, err) @@ -300,6 +379,7 @@ func TestDockerHostNet(t *testing.T) { } task := &structs.Task{ + Name: "redis-demo", Config: map[string]string{ "image": "redis", "network_mode": "host", @@ -309,7 +389,7 @@ func TestDockerHostNet(t *testing.T) { CPU: 512, }, } - driverCtx := testDriverContext(task.Name) + driverCtx := testDockerDriverContext(task.Name) ctx := testDriverExecContext(task, driverCtx) defer ctx.AllocDir.Destroy() d := NewDockerDriver(driverCtx) diff --git a/client/driver/driver.go b/client/driver/driver.go index 46b18538c..dd4fcf43c 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -3,6 +3,7 @@ package driver import ( "fmt" "log" + "path/filepath" "sync" "github.com/hashicorp/nomad/client/allocdir" @@ -15,11 +16,12 @@ import ( // BuiltinDrivers contains the built in registered drivers // which are available for allocation handling var BuiltinDrivers = map[string]Factory{ - "docker": NewDockerDriver, - "exec": NewExecDriver, - "java": NewJavaDriver, - "qemu": NewQemuDriver, - "rkt": NewRktDriver, + "docker": NewDockerDriver, + "exec": NewExecDriver, + "raw_exec": NewRawExecDriver, + "java": NewJavaDriver, + "qemu": NewQemuDriver, + "rkt": NewRktDriver, } // NewDriver is used to instantiate and return a new driver @@ -112,7 +114,13 @@ func TaskEnvironmentVariables(ctx *ExecContext, task *structs.Task) environment. env.SetMeta(task.Meta) if ctx.AllocDir != nil { - env.SetAllocDir(ctx.AllocDir.AllocDir) + env.SetAllocDir(ctx.AllocDir.SharedDir) + taskdir, ok := ctx.AllocDir.TaskDirs[task.Name] + if !ok { + // TODO: Update this to return an error + } + + env.SetTaskLocalDir(filepath.Join(taskdir, allocdir.TaskLocal)) } if task.Resources != nil { diff --git a/client/driver/environment/vars.go b/client/driver/environment/vars.go index 8e2d698ed..7ed645374 100644 --- a/client/driver/environment/vars.go +++ b/client/driver/environment/vars.go @@ -12,6 +12,10 @@ const ( // group. AllocDir = "NOMAD_ALLOC_DIR" + // The path to the tasks local directory where it can store data that is + // persisted to the alloc is removed. + TaskLocalDir = "NOMAD_TASK_DIR" + // The tasks memory limit in MBs. MemLimit = "NOMAD_MEMORY_LIMIT" @@ -30,6 +34,10 @@ const ( MetaPrefix = "NOMAD_META_" ) +var ( + nomadVars = []string{AllocDir, TaskLocalDir, MemLimit, CpuLimit, TaskIP, PortPrefix, MetaPrefix} +) + type TaskEnvironment map[string]string func NewTaskEnivornment() TaskEnvironment { @@ -70,18 +78,42 @@ func (t TaskEnvironment) SetAllocDir(dir string) { t[AllocDir] = dir } +func (t TaskEnvironment) ClearAllocDir() { + delete(t, AllocDir) +} + +func (t TaskEnvironment) SetTaskLocalDir(dir string) { + t[TaskLocalDir] = dir +} + +func (t TaskEnvironment) ClearTaskLocalDir() { + delete(t, TaskLocalDir) +} + func (t TaskEnvironment) SetMemLimit(limit int) { t[MemLimit] = strconv.Itoa(limit) } +func (t TaskEnvironment) ClearMemLimit() { + delete(t, MemLimit) +} + func (t TaskEnvironment) SetCpuLimit(limit int) { t[CpuLimit] = strconv.Itoa(limit) } +func (t TaskEnvironment) ClearCpuLimit() { + delete(t, CpuLimit) +} + func (t TaskEnvironment) SetTaskIp(ip string) { t[TaskIP] = ip } +func (t TaskEnvironment) ClearTaskIp() { + delete(t, TaskIP) +} + // Takes a map of port labels to their port value. func (t TaskEnvironment) SetPorts(ports map[string]int) { for label, port := range ports { @@ -89,6 +121,14 @@ func (t TaskEnvironment) SetPorts(ports map[string]int) { } } +func (t TaskEnvironment) ClearPorts() { + for k, _ := range t { + if strings.HasPrefix(k, PortPrefix) { + delete(t, k) + } + } +} + // Takes a map of meta values to be passed to the task. The keys are capatilized // when the environent variable is set. func (t TaskEnvironment) SetMeta(m map[string]string) { @@ -97,8 +137,28 @@ func (t TaskEnvironment) SetMeta(m map[string]string) { } } +func (t TaskEnvironment) ClearMeta() { + for k, _ := range t { + if strings.HasPrefix(k, MetaPrefix) { + delete(t, k) + } + } +} + func (t TaskEnvironment) SetEnvvars(m map[string]string) { for k, v := range m { t[k] = v } } + +func (t TaskEnvironment) ClearEnvvars() { +OUTER: + for k, _ := range t { + for _, nomadPrefix := range nomadVars { + if strings.HasPrefix(k, nomadPrefix) { + continue OUTER + } + } + delete(t, k) + } +} diff --git a/client/driver/environment/vars_test.go b/client/driver/environment/vars_test.go index c6cba7756..ac5f30375 100644 --- a/client/driver/environment/vars_test.go +++ b/client/driver/environment/vars_test.go @@ -21,7 +21,7 @@ func TestEnvironment_AsList(t *testing.T) { } } -func TastEnvironment_ParseFromList(t *testing.T) { +func TestEnvironment_ParseFromList(t *testing.T) { input := []string{"foo=bar", "BAZ=baM"} env, err := ParseFromList(input) if err != nil { @@ -29,10 +29,44 @@ func TastEnvironment_ParseFromList(t *testing.T) { } exp := map[string]string{ - "foo": "baz", + "foo": "bar", "BAZ": "baM", } - if !reflect.DeepEqual(env, exp) { - t.Fatalf("ParseFromList(%#v) returned %v; want %v", input, env, exp) + + if len(env) != len(exp) { + t.Fatalf("ParseFromList(%#v) has length %v; want %v", input, len(env), len(exp)) + } + + for k, v := range exp { + if actV, ok := env[k]; !ok { + t.Fatalf("ParseFromList(%#v) doesn't contain expected %v", input, k) + } else if actV != v { + t.Fatalf("ParseFromList(%#v) has incorrect value for %v; got %v; want %v", input, k, actV, v) + } + } +} + +func TestEnvironment_ClearEnvvars(t *testing.T) { + env := NewTaskEnivornment() + env.SetTaskIp("127.0.0.1") + env.SetEnvvars(map[string]string{"foo": "baz", "bar": "bang"}) + + act := env.List() + exp := []string{"NOMAD_IP=127.0.0.1", "bar=bang", "foo=baz"} + sort.Strings(act) + sort.Strings(exp) + if !reflect.DeepEqual(act, exp) { + t.Fatalf("env.List() returned %v; want %v", act, exp) + } + + // Clear the environent variables. + env.ClearEnvvars() + + act = env.List() + exp = []string{"NOMAD_IP=127.0.0.1"} + sort.Strings(act) + sort.Strings(exp) + if !reflect.DeepEqual(act, exp) { + t.Fatalf("env.List() returned %v; want %v", act, exp) } } diff --git a/client/driver/exec.go b/client/driver/exec.go index f7869e94b..0324cad68 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -2,10 +2,15 @@ package driver import ( "fmt" + "log" + "path" + "path/filepath" "runtime" "syscall" "time" + "github.com/hashicorp/go-getter" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/executor" "github.com/hashicorp/nomad/nomad/structs" @@ -41,12 +46,40 @@ func (d *ExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, } func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { - // Get the command + // Get the command to be ran command, ok := task.Config["command"] if !ok || command == "" { return nil, fmt.Errorf("missing command for exec driver") } + // Check if an artificat is specified and attempt to download it + source, ok := task.Config["artifact_source"] + if ok && source != "" { + // Proceed to download an artifact to be executed. + // We use go-getter to support a variety of protocols, but need to change + // file permissions of the resulted download to be executable + + // Create a location to download the artifact. + taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName] + if !ok { + return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) + } + destDir := filepath.Join(taskDir, allocdir.TaskLocal) + + artifactName := path.Base(source) + artifactFile := filepath.Join(destDir, artifactName) + if err := getter.GetFile(artifactFile, source); err != nil { + return nil, fmt.Errorf("Error downloading artifact for Exec driver: %s", err) + } + + // Add execution permissions to the newly downloaded artifact + if runtime.GOOS != "windows" { + if err := syscall.Chmod(artifactFile, 0755); err != nil { + log.Printf("[ERR] driver.Exec: Error making artifact executable: %s", err) + } + } + } + // Get the environment variables. envVars := TaskEnvironmentVariables(ctx, task) diff --git a/client/driver/exec_test.go b/client/driver/exec_test.go index 4a05d5891..ba8745176 100644 --- a/client/driver/exec_test.go +++ b/client/driver/exec_test.go @@ -5,10 +5,10 @@ import ( "io/ioutil" "path/filepath" "reflect" + "runtime" "testing" "time" - "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/environment" "github.com/hashicorp/nomad/nomad/structs" @@ -86,7 +86,7 @@ func TestExecDriver_Start_Wait(t *testing.T) { Name: "sleep", Config: map[string]string{ "command": "/bin/sleep", - "args": "1", + "args": "2", }, Resources: basicResources, } @@ -116,11 +116,109 @@ func TestExecDriver_Start_Wait(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - case <-time.After(2 * time.Second): + case <-time.After(4 * time.Second): t.Fatalf("timeout") } } +func TestExecDriver_Start_Artifact_basic(t *testing.T) { + ctestutils.ExecCompatible(t) + var file string + switch runtime.GOOS { + case "darwin": + file = "hi_darwin_amd64" + default: + file = "hi_linux_amd64" + } + + task := &structs.Task{ + Name: "sleep", + Config: map[string]string{ + "artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s", file), + "command": filepath.Join("$NOMAD_TASK_DIR", file), + }, + Resources: basicResources, + } + + driverCtx := testDriverContext(task.Name) + ctx := testDriverExecContext(task, driverCtx) + defer ctx.AllocDir.Destroy() + d := NewExecDriver(driverCtx) + + handle, err := d.Start(ctx, task) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + + // Update should be a no-op + err = handle.Update(task) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Task should terminate quickly + select { + case err := <-handle.WaitCh(): + if err != nil { + t.Fatalf("err: %v", err) + } + case <-time.After(5 * time.Second): + t.Fatalf("timeout") + } +} + +func TestExecDriver_Start_Artifact_expanded(t *testing.T) { + ctestutils.ExecCompatible(t) + var file string + switch runtime.GOOS { + case "darwin": + file = "hi_darwin_amd64" + default: + file = "hi_linux_amd64" + } + + task := &structs.Task{ + Name: "sleep", + Config: map[string]string{ + "artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s", file), + "command": "/bin/bash", + "args": fmt.Sprintf("-c '/bin/sleep 1 && %s'", filepath.Join("$NOMAD_TASK_DIR", file)), + }, + Resources: basicResources, + } + + driverCtx := testDriverContext(task.Name) + ctx := testDriverExecContext(task, driverCtx) + defer ctx.AllocDir.Destroy() + d := NewExecDriver(driverCtx) + + handle, err := d.Start(ctx, task) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + + // Update should be a no-op + err = handle.Update(task) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Task should terminate quickly + select { + case err := <-handle.WaitCh(): + if err != nil { + t.Fatalf("err: %v", err) + } + case <-time.After(5 * time.Second): + t.Fatalf("timeout") + } +} func TestExecDriver_Start_Wait_AllocDir(t *testing.T) { ctestutils.ExecCompatible(t) @@ -159,7 +257,7 @@ func TestExecDriver_Start_Wait_AllocDir(t *testing.T) { } // Check that data was written to the shared alloc directory. - outputFile := filepath.Join(ctx.AllocDir.AllocDir, allocdir.SharedAllocName, file) + outputFile := filepath.Join(ctx.AllocDir.SharedDir, file) act, err := ioutil.ReadFile(outputFile) if err != nil { t.Fatalf("Couldn't read expected output: %v", err) diff --git a/client/driver/java.go b/client/driver/java.go index 198410878..ac2c3c6f3 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -3,9 +3,6 @@ package driver import ( "bytes" "fmt" - "io" - "net/http" - "os" "os/exec" "path" "path/filepath" @@ -14,6 +11,7 @@ import ( "syscall" "time" + "github.com/hashicorp/go-getter" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/executor" @@ -69,7 +67,7 @@ func (d *JavaDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, } if infoString == "" { - d.logger.Println("[WARN] Error parsing Java version information, aborting") + d.logger.Println("[WARN] driver.java: error parsing Java version information, aborting") return false, nil } @@ -97,44 +95,33 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, fmt.Errorf("missing jar source for Java Jar driver") } - // Attempt to download the thing - // Should be extracted to some kind of Http Fetcher - // Right now, assume publicly accessible HTTP url - resp, err := http.Get(source) - if err != nil { - return nil, fmt.Errorf("Error downloading source for Java driver: %s", err) - } - - // Get the tasks local directory. taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName] if !ok { return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) } - taskLocal := filepath.Join(taskDir, allocdir.TaskLocal) + + destDir := filepath.Join(taskDir, allocdir.TaskLocal) // Create a location to download the binary. - fName := path.Base(source) - fPath := filepath.Join(taskLocal, fName) - f, err := os.OpenFile(fPath, os.O_CREATE|os.O_WRONLY, 0666) - if err != nil { - return nil, fmt.Errorf("Error opening file to download to: %s", err) - } - - defer f.Close() - defer resp.Body.Close() - - // Copy remote file to local directory for execution - // TODO: a retry of sort if io.Copy fails, for large binaries - _, ioErr := io.Copy(f, resp.Body) - if ioErr != nil { - return nil, fmt.Errorf("Error copying jar from source: %s", ioErr) + jarName := path.Base(source) + jarPath := filepath.Join(destDir, jarName) + if err := getter.GetFile(jarPath, source); err != nil { + return nil, fmt.Errorf("Error downloading source for Java driver: %s", err) } // Get the environment variables. envVars := TaskEnvironmentVariables(ctx, task) + args := []string{} + // Look for jvm options + jvm_options, ok := task.Config["jvm_options"] + if ok && jvm_options != "" { + d.logger.Printf("[DEBUG] driver.java: found JVM options: %s", jvm_options) + args = append(args, jvm_options) + } + // Build the argument list. - args := []string{"-jar", filepath.Join(allocdir.TaskLocal, fName)} + args = append(args, "-jar", filepath.Join(allocdir.TaskLocal, jarName)) if argRaw, ok := task.Config["args"]; ok { args = append(args, argRaw) } diff --git a/client/driver/java_test.go b/client/driver/java_test.go index f73c95692..ad8f5e578 100644 --- a/client/driver/java_test.go +++ b/client/driver/java_test.go @@ -100,6 +100,7 @@ func TestJavaDriver_Start_Wait(t *testing.T) { "jar_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/demoapp.jar", // "jar_source": "https://s3-us-west-2.amazonaws.com/java-jar-thing/demoapp.jar", // "args": "-d64", + "jvm_options": "-Xmx2048m -Xms256m", }, Resources: basicResources, } diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 540dc9068..90b9aa19f 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -8,7 +8,6 @@ import ( "fmt" "io" "log" - "net/http" "os" "os/exec" "path/filepath" @@ -19,6 +18,7 @@ import ( "syscall" "time" + "github.com/hashicorp/go-getter" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/nomad/structs" @@ -94,45 +94,25 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, fmt.Errorf("Missing required Task Resource: Memory") } - // Attempt to download the thing - // Should be extracted to some kind of Http Fetcher - // Right now, assume publicly accessible HTTP url - resp, err := http.Get(source) - if err != nil { - return nil, fmt.Errorf("Error downloading source for Qemu driver: %s", err) - } - // Get the tasks local directory. taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName] if !ok { return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) } - taskLocal := filepath.Join(taskDir, allocdir.TaskLocal) - // Create a location in the local directory to download and store the image. - // TODO: Caching + // Create a location to download the binary. + destDir := filepath.Join(taskDir, allocdir.TaskLocal) vmID := fmt.Sprintf("qemu-vm-%s-%s", structs.GenerateUUID(), filepath.Base(source)) - fPath := filepath.Join(taskLocal, vmID) - vmPath, err := os.OpenFile(fPath, os.O_CREATE|os.O_WRONLY, 0666) - if err != nil { - return nil, fmt.Errorf("Error opening file to download to: %s", err) - } - - defer vmPath.Close() - defer resp.Body.Close() - - // Copy remote file to local AllocDir for execution - // TODO: a retry of sort if io.Copy fails, for large binaries - _, ioErr := io.Copy(vmPath, resp.Body) - if ioErr != nil { - return nil, fmt.Errorf("Error copying Qemu image from source: %s", ioErr) + vmPath := filepath.Join(destDir, vmID) + if err := getter.GetFile(vmPath, source); err != nil { + return nil, fmt.Errorf("Error downloading artifact for Qemu driver: %s", err) } // compute and check checksum if check, ok := task.Config["checksum"]; ok { d.logger.Printf("[DEBUG] Running checksum on (%s)", vmID) hasher := sha256.New() - file, err := os.Open(vmPath.Name()) + file, err := os.Open(vmPath) if err != nil { return nil, fmt.Errorf("Failed to open file for checksum") } @@ -163,7 +143,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, "-machine", "type=pc,accel=" + accelerator, "-name", vmID, "-m", mem, - "-drive", "file=" + vmPath.Name(), + "-drive", "file=" + vmPath, "-nodefconfig", "-nodefaults", "-nographic", @@ -240,7 +220,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, // Create and Return Handle h := &qemuHandle{ proc: cmd.Process, - vmID: vmPath.Name(), + vmID: vmPath, doneCh: make(chan struct{}), waitCh: make(chan error, 1), } diff --git a/client/driver/qemu_test.go b/client/driver/qemu_test.go index 4c2c2e7d1..e9a9e5744 100644 --- a/client/driver/qemu_test.go +++ b/client/driver/qemu_test.go @@ -3,7 +3,6 @@ package driver import ( "fmt" "os" - "os/exec" "testing" "github.com/hashicorp/nomad/client/config" @@ -12,14 +11,6 @@ import ( ctestutils "github.com/hashicorp/nomad/client/testutil" ) -// qemuLocated looks to see whether qemu binaries are available on this system -// before we try to run tests. We may need to tweak this for cross-OS support -// but I think this should work on *nix at least. -func qemuLocated() bool { - _, err := exec.Command("qemu-x86_64", "-version").CombinedOutput() - return err == nil -} - func TestQemuDriver_Handle(t *testing.T) { h := &qemuHandle{ proc: &os.Process{Pid: 123}, @@ -58,10 +49,7 @@ func TestQemuDriver_Fingerprint(t *testing.T) { } func TestQemuDriver_Start(t *testing.T) { - if !qemuLocated() { - t.Skip("QEMU not found; skipping") - } - + ctestutils.QemuCompatible(t) // TODO: use test server to load from a fixture task := &structs.Task{ Name: "linux", @@ -110,10 +98,7 @@ func TestQemuDriver_Start(t *testing.T) { } func TestQemuDriver_RequiresMemory(t *testing.T) { - if !qemuLocated() { - t.Skip("QEMU not found; skipping") - } - + ctestutils.QemuCompatible(t) // TODO: use test server to load from a fixture task := &structs.Task{ Name: "linux", @@ -136,5 +121,4 @@ func TestQemuDriver_RequiresMemory(t *testing.T) { if err == nil { t.Fatalf("Expected error when not specifying memory") } - } diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go new file mode 100644 index 000000000..fd54e1b86 --- /dev/null +++ b/client/driver/raw_exec.go @@ -0,0 +1,239 @@ +package driver + +import ( + "fmt" + "log" + "os" + "os/exec" + "path" + "path/filepath" + "runtime" + "strconv" + "strings" + "syscall" + "time" + + "github.com/hashicorp/go-getter" + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/driver/args" + "github.com/hashicorp/nomad/nomad/structs" +) + +const ( + // The option that enables this driver in the Config.Options map. + rawExecConfigOption = "driver.raw_exec.enable" + + // Null files to use as stdin. + unixNull = "/dev/null" + windowsNull = "nul" +) + +// The RawExecDriver is a privileged version of the exec driver. It provides no +// resource isolation and just fork/execs. The Exec driver should be preferred +// and this should only be used when explicitly needed. +type RawExecDriver struct { + DriverContext +} + +// rawExecHandle is returned from Start/Open as a handle to the PID +type rawExecHandle struct { + proc *os.Process + waitCh chan error + doneCh chan struct{} +} + +// NewRawExecDriver is used to create a new raw exec driver +func NewRawExecDriver(ctx *DriverContext) Driver { + return &RawExecDriver{*ctx} +} + +func (d *RawExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, error) { + // Check that the user has explicitly enabled this executor. + enabled, err := strconv.ParseBool(cfg.ReadDefault(rawExecConfigOption, "false")) + if err != nil { + return false, fmt.Errorf("Failed to parse %v option: %v", rawExecConfigOption, err) + } + + if enabled { + d.logger.Printf("[WARN] driver.raw_exec: raw exec is enabled. Only enable if needed") + node.Attributes["driver.raw_exec"] = "1" + return true, nil + } + + return false, nil +} + +func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { + // Get the tasks local directory. + taskName := d.DriverContext.taskName + taskDir, ok := ctx.AllocDir.TaskDirs[taskName] + if !ok { + return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) + } + taskLocal := filepath.Join(taskDir, allocdir.TaskLocal) + + // Get the command to be ran + command, ok := task.Config["command"] + if !ok || command == "" { + return nil, fmt.Errorf("missing command for Raw Exec driver") + } + + // Check if an artificat is specified and attempt to download it + source, ok := task.Config["artifact_source"] + if ok && source != "" { + // Proceed to download an artifact to be executed. + // We use go-getter to support a variety of protocols, but need to change + // file permissions of the resulted download to be executable + + // Create a location to download the artifact. + destDir := filepath.Join(taskDir, allocdir.TaskLocal) + + artifactName := path.Base(source) + artifactFile := filepath.Join(destDir, artifactName) + if err := getter.GetFile(artifactFile, source); err != nil { + return nil, fmt.Errorf("Error downloading artifact for Raw Exec driver: %s", err) + } + + // Add execution permissions to the newly downloaded artifact + if runtime.GOOS != "windows" { + if err := syscall.Chmod(artifactFile, 0755); err != nil { + log.Printf("[ERR] driver.raw_exec: Error making artifact executable: %s", err) + } + } + } + + // Get the environment variables. + envVars := TaskEnvironmentVariables(ctx, task) + + // expand NOMAD_TASK_DIR + parsedPath, err := args.ParseAndReplace(command, envVars.Map()) + if err != nil { + return nil, fmt.Errorf("failure to parse arguments in command path: %v", command) + } else if len(parsedPath) != 1 { + return nil, fmt.Errorf("couldn't properly parse command path: %v", command) + } + + cm := parsedPath[0] + + // Look for arguments + var cmdArgs []string + if argRaw, ok := task.Config["args"]; ok { + parsed, err := args.ParseAndReplace(argRaw, envVars.Map()) + if err != nil { + return nil, err + } + cmdArgs = append(cmdArgs, parsed...) + } + + // Setup the command + cmd := exec.Command(cm, cmdArgs...) + cmd.Dir = taskDir + cmd.Env = envVars.List() + + // Capture the stdout/stderr and redirect stdin to /dev/null + stdoutFilename := filepath.Join(taskLocal, fmt.Sprintf("%s.stdout", taskName)) + stderrFilename := filepath.Join(taskLocal, fmt.Sprintf("%s.stderr", taskName)) + stdinFilename := unixNull + if runtime.GOOS == "windows" { + stdinFilename = windowsNull + } + + stdo, err := os.OpenFile(stdoutFilename, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + if err != nil { + return nil, fmt.Errorf("Error opening file to redirect stdout: %v", err) + } + + stde, err := os.OpenFile(stderrFilename, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + if err != nil { + return nil, fmt.Errorf("Error opening file to redirect stderr: %v", err) + } + + stdi, err := os.OpenFile(stdinFilename, os.O_CREATE|os.O_RDONLY, 0666) + if err != nil { + return nil, fmt.Errorf("Error opening file to redirect stdin: %v", err) + } + + cmd.Stdout = stdo + cmd.Stderr = stde + cmd.Stdin = stdi + + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("failed to start command: %v", err) + } + + // Return a driver handle + h := &rawExecHandle{ + proc: cmd.Process, + doneCh: make(chan struct{}), + waitCh: make(chan error, 1), + } + go h.run() + return h, nil +} + +func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { + // Split the handle + pidStr := strings.TrimPrefix(handleID, "PID:") + pid, err := strconv.Atoi(pidStr) + if err != nil { + return nil, fmt.Errorf("failed to parse handle '%s': %v", handleID, err) + } + + // Find the process + proc, err := os.FindProcess(pid) + if proc == nil || err != nil { + return nil, fmt.Errorf("failed to find PID %d: %v", pid, err) + } + + // Return a driver handle + h := &rawExecHandle{ + proc: proc, + doneCh: make(chan struct{}), + waitCh: make(chan error, 1), + } + go h.run() + return h, nil +} + +func (h *rawExecHandle) ID() string { + // Return a handle to the PID + return fmt.Sprintf("PID:%d", h.proc.Pid) +} + +func (h *rawExecHandle) WaitCh() chan error { + return h.waitCh +} + +func (h *rawExecHandle) Update(task *structs.Task) error { + // Update is not possible + return nil +} + +// Kill is used to terminate the task. We send an Interrupt +// and then provide a 5 second grace period before doing a Kill on supported +// OS's, otherwise we kill immediately. +func (h *rawExecHandle) Kill() error { + if runtime.GOOS == "windows" { + return h.proc.Kill() + } + + h.proc.Signal(os.Interrupt) + select { + case <-h.doneCh: + return nil + case <-time.After(5 * time.Second): + return h.proc.Kill() + } +} + +func (h *rawExecHandle) run() { + ps, err := h.proc.Wait() + close(h.doneCh) + if err != nil { + h.waitCh <- err + } else if !ps.Success() { + h.waitCh <- fmt.Errorf("task exited with error") + } + close(h.waitCh) +} diff --git a/client/driver/raw_exec_test.go b/client/driver/raw_exec_test.go new file mode 100644 index 000000000..0a6133df9 --- /dev/null +++ b/client/driver/raw_exec_test.go @@ -0,0 +1,327 @@ +package driver + +import ( + "fmt" + "io/ioutil" + "path/filepath" + "reflect" + "runtime" + "testing" + "time" + + "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/driver/environment" + "github.com/hashicorp/nomad/nomad/structs" +) + +func TestRawExecDriver_Fingerprint(t *testing.T) { + d := NewRawExecDriver(testDriverContext("")) + node := &structs.Node{ + Attributes: make(map[string]string), + } + + // Disable raw exec. + cfg := &config.Config{Options: map[string]string{rawExecConfigOption: "false"}} + + apply, err := d.Fingerprint(cfg, node) + if err != nil { + t.Fatalf("err: %v", err) + } + if apply { + t.Fatalf("should not apply") + } + if node.Attributes["driver.raw_exec"] != "" { + t.Fatalf("driver incorrectly enabled") + } + + // Enable raw exec. + cfg.Options[rawExecConfigOption] = "true" + apply, err = d.Fingerprint(cfg, node) + if err != nil { + t.Fatalf("err: %v", err) + } + if !apply { + t.Fatalf("should apply") + } + if node.Attributes["driver.raw_exec"] != "1" { + t.Fatalf("driver not enabled") + } +} + +func TestRawExecDriver_StartOpen_Wait(t *testing.T) { + task := &structs.Task{ + Name: "sleep", + Config: map[string]string{ + "command": "/bin/sleep", + "args": "1", + }, + } + driverCtx := testDriverContext(task.Name) + ctx := testDriverExecContext(task, driverCtx) + defer ctx.AllocDir.Destroy() + + d := NewRawExecDriver(driverCtx) + handle, err := d.Start(ctx, task) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + + // Attempt to open + handle2, err := d.Open(ctx, handle.ID()) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle2 == nil { + t.Fatalf("missing handle") + } + + // Task should terminate quickly + select { + case <-handle2.WaitCh(): + case <-time.After(2 * time.Second): + t.Fatalf("timeout") + } + + // Check they are both tracking the same PID. + pid1 := handle.(*rawExecHandle).proc.Pid + pid2 := handle2.(*rawExecHandle).proc.Pid + if pid1 != pid2 { + t.Fatalf("tracking incorrect Pid; %v != %v", pid1, pid2) + } +} + +func TestRawExecDriver_Start_Artifact_basic(t *testing.T) { + var file string + switch runtime.GOOS { + case "darwin": + file = "hi_darwin_amd64" + default: + file = "hi_linux_amd64" + } + + task := &structs.Task{ + Name: "sleep", + Config: map[string]string{ + "artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s", file), + "command": filepath.Join("$NOMAD_TASK_DIR", file), + }, + } + driverCtx := testDriverContext(task.Name) + ctx := testDriverExecContext(task, driverCtx) + defer ctx.AllocDir.Destroy() + + d := NewRawExecDriver(driverCtx) + handle, err := d.Start(ctx, task) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + + // Attempt to open + handle2, err := d.Open(ctx, handle.ID()) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle2 == nil { + t.Fatalf("missing handle") + } + + // Task should terminate quickly + select { + case <-handle2.WaitCh(): + case <-time.After(5 * time.Second): + t.Fatalf("timeout") + } + + // Check they are both tracking the same PID. + pid1 := handle.(*rawExecHandle).proc.Pid + pid2 := handle2.(*rawExecHandle).proc.Pid + if pid1 != pid2 { + t.Fatalf("tracking incorrect Pid; %v != %v", pid1, pid2) + } +} + +func TestRawExecDriver_Start_Artifact_expanded(t *testing.T) { + var file string + switch runtime.GOOS { + case "darwin": + file = "hi_darwin_amd64" + default: + file = "hi_linux_amd64" + } + + task := &structs.Task{ + Name: "sleep", + Config: map[string]string{ + "artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s", file), + "command": "/bin/bash", + "args": fmt.Sprintf("-c '/bin/sleep 1 && %s'", filepath.Join("$NOMAD_TASK_DIR", file)), + }, + } + driverCtx := testDriverContext(task.Name) + ctx := testDriverExecContext(task, driverCtx) + defer ctx.AllocDir.Destroy() + + d := NewRawExecDriver(driverCtx) + handle, err := d.Start(ctx, task) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + + // Attempt to open + handle2, err := d.Open(ctx, handle.ID()) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle2 == nil { + t.Fatalf("missing handle") + } + + // Task should terminate quickly + select { + case <-handle2.WaitCh(): + case <-time.After(5 * time.Second): + t.Fatalf("timeout") + } + + // Check they are both tracking the same PID. + pid1 := handle.(*rawExecHandle).proc.Pid + pid2 := handle2.(*rawExecHandle).proc.Pid + if pid1 != pid2 { + t.Fatalf("tracking incorrect Pid; %v != %v", pid1, pid2) + } +} + +func TestRawExecDriver_Start_Wait(t *testing.T) { + task := &structs.Task{ + Name: "sleep", + Config: map[string]string{ + "command": "/bin/sleep", + "args": "1", + }, + } + + driverCtx := testDriverContext(task.Name) + ctx := testDriverExecContext(task, driverCtx) + defer ctx.AllocDir.Destroy() + + d := NewRawExecDriver(driverCtx) + handle, err := d.Start(ctx, task) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + + // Update should be a no-op + err = handle.Update(task) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Task should terminate quickly + select { + case err := <-handle.WaitCh(): + if err != nil { + t.Fatalf("err: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatalf("timeout") + } +} + +func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) { + exp := []byte{'w', 'i', 'n'} + file := "output.txt" + task := &structs.Task{ + Name: "sleep", + Config: map[string]string{ + "command": "/bin/bash", + "args": fmt.Sprintf(`-c "sleep 1; echo -n %s > $%s/%s"`, string(exp), environment.AllocDir, file), + }, + } + + driverCtx := testDriverContext(task.Name) + ctx := testDriverExecContext(task, driverCtx) + defer ctx.AllocDir.Destroy() + + d := NewRawExecDriver(driverCtx) + handle, err := d.Start(ctx, task) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + + // Task should terminate quickly + select { + case err := <-handle.WaitCh(): + if err != nil { + t.Fatalf("err: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatalf("timeout") + } + + // Check that data was written to the shared alloc directory. + outputFile := filepath.Join(ctx.AllocDir.SharedDir, file) + act, err := ioutil.ReadFile(outputFile) + if err != nil { + t.Fatalf("Couldn't read expected output: %v", err) + } + + if !reflect.DeepEqual(act, exp) { + t.Fatalf("Command outputted %v; want %v", act, exp) + } +} + +func TestRawExecDriver_Start_Kill_Wait(t *testing.T) { + task := &structs.Task{ + Name: "sleep", + Config: map[string]string{ + "command": "/bin/sleep", + "args": "1", + }, + } + + driverCtx := testDriverContext(task.Name) + ctx := testDriverExecContext(task, driverCtx) + defer ctx.AllocDir.Destroy() + + d := NewRawExecDriver(driverCtx) + handle, err := d.Start(ctx, task) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + + go func() { + time.Sleep(100 * time.Millisecond) + err := handle.Kill() + if err != nil { + t.Fatalf("err: %v", err) + } + }() + + // Task should terminate quickly + select { + case err := <-handle.WaitCh(): + if err == nil { + t.Fatal("should err") + } + case <-time.After(2 * time.Second): + t.Fatalf("timeout") + } +} diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 23f4d29ee..456e4e02b 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -7,19 +7,22 @@ import ( "log" "os" "os/exec" + "path/filepath" "regexp" "runtime" "strings" "syscall" "time" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/driver/args" "github.com/hashicorp/nomad/nomad/structs" ) var ( - reRktVersion = regexp.MustCompile("rkt version ([\\d\\.]+).+") - reAppcVersion = regexp.MustCompile("appc version ([\\d\\.]+).+") + reRktVersion = regexp.MustCompile(`rkt version (\d[.\d]+)`) + reAppcVersion = regexp.MustCompile(`appc version (\d[.\d]+)`) ) // RktDriver is a driver for running images via Rkt @@ -32,7 +35,7 @@ type RktDriver struct { // rktHandle is returned from Start/Open as a handle to the PID type rktHandle struct { proc *os.Process - name string + image string logger *log.Logger waitCh chan error doneCh chan struct{} @@ -41,8 +44,8 @@ type rktHandle struct { // rktPID is a struct to map the pid running the process to the vm image on // disk type rktPID struct { - Pid int - Name string + Pid int + Image string } // NewRktDriver is used to create a new exec driver @@ -64,13 +67,13 @@ func (d *RktDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, e out := strings.TrimSpace(string(outBytes)) rktMatches := reRktVersion.FindStringSubmatch(out) - appcMatches := reRktVersion.FindStringSubmatch(out) + appcMatches := reAppcVersion.FindStringSubmatch(out) if len(rktMatches) != 2 || len(appcMatches) != 2 { return false, fmt.Errorf("Unable to parse Rkt version string: %#v", rktMatches) } - node.Attributes["driver.rkt"] = "true" - node.Attributes["driver.rkt.version"] = rktMatches[0] + node.Attributes["driver.rkt"] = "1" + node.Attributes["driver.rkt.version"] = rktMatches[1] node.Attributes["driver.rkt.appc.version"] = appcMatches[1] return true, nil @@ -78,61 +81,104 @@ func (d *RktDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, e // Run an existing Rkt image. func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { - trust_prefix, ok := task.Config["trust_prefix"] - if !ok || trust_prefix == "" { - return nil, fmt.Errorf("Missing trust prefix for rkt") + // Validate that the config is valid. + img, ok := task.Config["image"] + if !ok || img == "" { + return nil, fmt.Errorf("Missing ACI image for rkt") } + // Get the tasks local directory. + taskName := d.DriverContext.taskName + taskDir, ok := ctx.AllocDir.TaskDirs[taskName] + if !ok { + return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) + } + taskLocal := filepath.Join(taskDir, allocdir.TaskLocal) + // Add the given trust prefix - var outBuf, errBuf bytes.Buffer - cmd := exec.Command("rkt", "trust", fmt.Sprintf("--prefix=%s", trust_prefix)) - cmd.Stdout = &outBuf - cmd.Stderr = &errBuf - d.logger.Printf("[DEBUG] driver.rkt: starting rkt command: %q", cmd.Args) - if err := cmd.Run(); err != nil { - return nil, fmt.Errorf( - "Error running rkt: %s\n\nOutput: %s\n\nError: %s", - err, outBuf.String(), errBuf.String()) - } - d.logger.Printf("[DEBUG] driver.rkt: added trust prefix: %q", trust_prefix) - - name, ok := task.Config["name"] - if !ok || name == "" { - return nil, fmt.Errorf("Missing ACI name for rkt") + trust_prefix, trust_cmd := task.Config["trust_prefix"] + if trust_cmd { + var outBuf, errBuf bytes.Buffer + cmd := exec.Command("rkt", "trust", fmt.Sprintf("--prefix=%s", trust_prefix)) + cmd.Stdout = &outBuf + cmd.Stderr = &errBuf + if err := cmd.Run(); err != nil { + return nil, fmt.Errorf("Error running rkt trust: %s\n\nOutput: %s\n\nError: %s", + err, outBuf.String(), errBuf.String()) + } + d.logger.Printf("[DEBUG] driver.rkt: added trust prefix: %q", trust_prefix) } - exec_cmd, ok := task.Config["exec"] - if !ok || exec_cmd == "" { - d.logger.Printf("[WARN] driver.rkt: could not find a command to execute in the ACI, the default command will be executed") + // Build the command. + var cmd_args []string + + // Inject the environment variables. + envVars := TaskEnvironmentVariables(ctx, task) + + // Clear the task directories as they are not currently supported. + envVars.ClearTaskLocalDir() + envVars.ClearAllocDir() + + for k, v := range envVars.Map() { + cmd_args = append(cmd_args, fmt.Sprintf("--set-env=%v=%v", k, v)) } - // Run the ACI - var aoutBuf, aerrBuf bytes.Buffer - run_cmd := []string{ - "rkt", - "run", - "--mds-register=false", - name, + // Disble signature verification if the trust command was not run. + if !trust_cmd { + cmd_args = append(cmd_args, "--insecure-skip-verify") } - if exec_cmd != "" { - splitted := strings.Fields(exec_cmd) - run_cmd = append(run_cmd, "--exec=", splitted[0], "--") - run_cmd = append(run_cmd, splitted[1:]...) - run_cmd = append(run_cmd, "---") + + // Append the run command. + cmd_args = append(cmd_args, "run", "--mds-register=false", img) + + // Check if the user has overriden the exec command. + if exec_cmd, ok := task.Config["command"]; ok { + cmd_args = append(cmd_args, fmt.Sprintf("--exec=%v", exec_cmd)) } - acmd := exec.Command(run_cmd[0], run_cmd[1:]...) - acmd.Stdout = &aoutBuf - acmd.Stderr = &aerrBuf - d.logger.Printf("[DEBUG] driver:rkt: starting rkt command: %q", acmd.Args) - if err := acmd.Start(); err != nil { - return nil, fmt.Errorf( - "Error running rkt: %s\n\nOutput: %s\n\nError: %s", - err, aoutBuf.String(), aerrBuf.String()) + + // Add user passed arguments. + if userArgs, ok := task.Config["args"]; ok { + parsed, err := args.ParseAndReplace(userArgs, envVars.Map()) + if err != nil { + return nil, err + } + + // Need to start arguments with "--" + if len(parsed) > 0 { + cmd_args = append(cmd_args, "--") + } + + for _, arg := range parsed { + cmd_args = append(cmd_args, fmt.Sprintf("%v", arg)) + } } - d.logger.Printf("[DEBUG] driver.rkt: started ACI: %q", name) + + // Create files to capture stdin and out. + stdoutFilename := filepath.Join(taskLocal, fmt.Sprintf("%s.stdout", taskName)) + stderrFilename := filepath.Join(taskLocal, fmt.Sprintf("%s.stderr", taskName)) + + stdo, err := os.OpenFile(stdoutFilename, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + if err != nil { + return nil, fmt.Errorf("Error opening file to redirect stdout: %v", err) + } + + stde, err := os.OpenFile(stderrFilename, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + if err != nil { + return nil, fmt.Errorf("Error opening file to redirect stderr: %v", err) + } + + cmd := exec.Command("rkt", cmd_args...) + cmd.Stdout = stdo + cmd.Stderr = stde + + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("Error running rkt: %v", err) + } + + d.logger.Printf("[DEBUG] driver.rkt: started ACI %q with: %v", img, cmd.Args) h := &rktHandle{ - proc: acmd.Process, - name: name, + proc: cmd.Process, + image: img, logger: d.logger, doneCh: make(chan struct{}), waitCh: make(chan error, 1), @@ -158,7 +204,7 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error // Return a driver handle h := &rktHandle{ proc: proc, - name: qpid.Name, + image: qpid.Image, logger: d.logger, doneCh: make(chan struct{}), waitCh: make(chan error, 1), @@ -171,8 +217,8 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error func (h *rktHandle) ID() string { // Return a handle to the PID pid := &rktPID{ - Pid: h.proc.Pid, - Name: h.name, + Pid: h.proc.Pid, + Image: h.image, } data, err := json.Marshal(pid) if err != nil { diff --git a/client/driver/rkt_test.go b/client/driver/rkt_test.go index ea42b2268..53ea2a427 100644 --- a/client/driver/rkt_test.go +++ b/client/driver/rkt_test.go @@ -2,26 +2,44 @@ package driver import ( "fmt" + "io/ioutil" "os" + "path/filepath" "testing" "time" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/nomad/structs" ctestutils "github.com/hashicorp/nomad/client/testutil" ) +func TestRktVersionRegex(t *testing.T) { + input_rkt := "rkt version 0.8.1" + input_appc := "appc version 1.2.0" + expected_rkt := "0.8.1" + expected_appc := "1.2.0" + rktMatches := reRktVersion.FindStringSubmatch(input_rkt) + appcMatches := reAppcVersion.FindStringSubmatch(input_appc) + if rktMatches[1] != expected_rkt { + fmt.Printf("Test failed; got %q; want %q\n", rktMatches[1], expected_rkt) + } + if appcMatches[1] != expected_appc { + fmt.Printf("Test failed; got %q; want %q\n", appcMatches[1], expected_appc) + } +} + func TestRktDriver_Handle(t *testing.T) { h := &rktHandle{ proc: &os.Process{Pid: 123}, - name: "foo", + image: "foo", doneCh: make(chan struct{}), waitCh: make(chan error, 1), } actual := h.ID() - expected := `Rkt:{"Pid":123,"Name":"foo"}` + expected := `Rkt:{"Pid":123,"Image":"foo"}` if actual != expected { t.Errorf("Expected `%s`, found `%s`", expected, actual) } @@ -41,7 +59,7 @@ func TestRktDriver_Fingerprint(t *testing.T) { if !apply { t.Fatalf("should apply") } - if node.Attributes["driver.rkt"] == "" { + if node.Attributes["driver.rkt"] != "1" { t.Fatalf("Missing Rkt driver") } if node.Attributes["driver.rkt.version"] == "" { @@ -59,8 +77,8 @@ func TestRktDriver_Start(t *testing.T) { Name: "etcd", Config: map[string]string{ "trust_prefix": "coreos.com/etcd", - "name": "coreos.com/etcd:v2.0.4", - "exec": "/etcd --version", + "image": "coreos.com/etcd:v2.0.4", + "command": "/etcd", }, } @@ -98,8 +116,9 @@ func TestRktDriver_Start_Wait(t *testing.T) { Name: "etcd", Config: map[string]string{ "trust_prefix": "coreos.com/etcd", - "name": "coreos.com/etcd:v2.0.4", - "exec": "/etcd --version", + "image": "coreos.com/etcd:v2.0.4", + "command": "/etcd", + "args": "--version", }, } @@ -132,3 +151,94 @@ func TestRktDriver_Start_Wait(t *testing.T) { t.Fatalf("timeout") } } + +func TestRktDriver_Start_Wait_Skip_Trust(t *testing.T) { + ctestutils.RktCompatible(t) + task := &structs.Task{ + Name: "etcd", + Config: map[string]string{ + "image": "coreos.com/etcd:v2.0.4", + "command": "/etcd", + "args": "--version", + }, + } + + driverCtx := testDriverContext(task.Name) + ctx := testDriverExecContext(task, driverCtx) + d := NewRktDriver(driverCtx) + defer ctx.AllocDir.Destroy() + + handle, err := d.Start(ctx, task) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + defer handle.Kill() + + // Update should be a no-op + err = handle.Update(task) + if err != nil { + t.Fatalf("err: %v", err) + } + + select { + case err := <-handle.WaitCh(): + if err != nil { + t.Fatalf("err: %v", err) + } + case <-time.After(5 * time.Second): + t.Fatalf("timeout") + } +} + +func TestRktDriver_Start_Wait_Logs(t *testing.T) { + ctestutils.RktCompatible(t) + task := &structs.Task{ + Name: "etcd", + Config: map[string]string{ + "trust_prefix": "coreos.com/etcd", + "image": "coreos.com/etcd:v2.0.4", + "command": "/etcd", + "args": "--version", + }, + } + + driverCtx := testDriverContext(task.Name) + ctx := testDriverExecContext(task, driverCtx) + d := NewRktDriver(driverCtx) + defer ctx.AllocDir.Destroy() + + handle, err := d.Start(ctx, task) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + defer handle.Kill() + + select { + case err := <-handle.WaitCh(): + if err != nil { + t.Fatalf("err: %v", err) + } + case <-time.After(5 * time.Second): + t.Fatalf("timeout") + } + + taskDir, ok := ctx.AllocDir.TaskDirs[task.Name] + if !ok { + t.Fatalf("Could not find task directory for task: %v", task) + } + stdout := filepath.Join(taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stdout", task.Name)) + data, err := ioutil.ReadFile(stdout) + if err != nil { + t.Fatalf("Failed to read tasks stdout: %v", err) + } + + if len(data) == 0 { + t.Fatal("Task's stdout is empty") + } +} diff --git a/client/executor/exec_linux.go b/client/executor/exec_linux.go index 3985fb5fe..ceb178063 100644 --- a/client/executor/exec_linux.go +++ b/client/executor/exec_linux.go @@ -112,7 +112,7 @@ func (e *LinuxExecutor) ConfigureTaskDir(taskName string, alloc *allocdir.AllocD // Mount dev dev := filepath.Join(taskDir, "dev") if err := os.Mkdir(dev, 0777); err != nil { - return fmt.Errorf("Mkdir(%v) failed: %v", dev) + return fmt.Errorf("Mkdir(%v) failed: %v", dev, err) } if err := syscall.Mount("", dev, "devtmpfs", syscall.MS_RDONLY, ""); err != nil { @@ -122,7 +122,7 @@ func (e *LinuxExecutor) ConfigureTaskDir(taskName string, alloc *allocdir.AllocD // Mount proc proc := filepath.Join(taskDir, "proc") if err := os.Mkdir(proc, 0777); err != nil { - return fmt.Errorf("Mkdir(%v) failed: %v", proc) + return fmt.Errorf("Mkdir(%v) failed: %v", proc, err) } if err := syscall.Mount("", proc, "proc", syscall.MS_RDONLY, ""); err != nil { @@ -135,6 +135,7 @@ func (e *LinuxExecutor) ConfigureTaskDir(taskName string, alloc *allocdir.AllocD return err } env.SetAllocDir(filepath.Join("/", allocdir.SharedAllocName)) + env.SetTaskLocalDir(filepath.Join("/", allocdir.TaskLocal)) e.Cmd.Env = env.List() e.alloc = alloc @@ -195,7 +196,11 @@ func (e *LinuxExecutor) configureCgroups(resources *structs.Resources) error { e.groups.MemorySwap = int64(-1) } - if resources.CPU > 0.0 { + if resources.CPU != 0 { + if resources.CPU < 2 { + return fmt.Errorf("resources.CPU must be equal to or greater than 2: %v", resources.CPU) + } + // Set the relative CPU shares for this cgroup. // The simplest scale is 1 share to 1 MHz so 1024 = 1GHz. This means any // given process will have at least that amount of resources, but likely @@ -261,6 +266,14 @@ func (e *LinuxExecutor) Start() error { return err } + parsedPath, err := args.ParseAndReplace(e.cmd.Path, envVars.Map()) + if err != nil { + return err + } else if len(parsedPath) != 1 { + return fmt.Errorf("couldn't properly parse command path: %v", e.cmd.Path) + } + e.cmd.Path = parsedPath[0] + combined := strings.Join(e.Cmd.Args, " ") parsed, err := args.ParseAndReplace(combined, envVars.Map()) if err != nil { @@ -542,6 +555,11 @@ func (e *LinuxExecutor) destroyCgroup() error { multierror.Append(errs, fmt.Errorf("Failed to kill Pid %v: %v", pid, err)) continue } + + if _, err := process.Wait(); err != nil { + multierror.Append(errs, fmt.Errorf("Failed to wait Pid %v: %v", pid, err)) + continue + } } // Remove the cgroup. diff --git a/client/executor/exec_universal.go b/client/executor/exec_universal.go index 30e40fd43..6b1977d10 100644 --- a/client/executor/exec_universal.go +++ b/client/executor/exec_universal.go @@ -6,8 +6,11 @@ import ( "fmt" "os" "strconv" + "strings" "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/driver/args" + "github.com/hashicorp/nomad/client/driver/environment" "github.com/hashicorp/nomad/nomad/structs" ) @@ -29,11 +32,37 @@ func (e *UniversalExecutor) Limit(resources *structs.Resources) error { } func (e *UniversalExecutor) ConfigureTaskDir(taskName string, alloc *allocdir.AllocDir) error { - // No-op + taskDir, ok := alloc.TaskDirs[taskName] + if !ok { + return fmt.Errorf("Error finding task dir for (%s)", taskName) + } + e.Dir = taskDir return nil } func (e *UniversalExecutor) Start() error { + // Parse the commands arguments and replace instances of Nomad environment + // variables. + envVars, err := environment.ParseFromList(e.cmd.Env) + if err != nil { + return err + } + + parsedPath, err := args.ParseAndReplace(e.cmd.Path, envVars.Map()) + if err != nil { + return err + } else if len(parsedPath) != 1 { + return fmt.Errorf("couldn't properly parse command path: %v", e.cmd.Path) + } + + e.cmd.Path = parsedPath[0] + combined := strings.Join(e.cmd.Args, " ") + parsed, err := args.ParseAndReplace(combined, envVars.Map()) + if err != nil { + return err + } + e.Cmd.Args = parsed + // We don't want to call ourself. We want to call Start on our embedded Cmd return e.cmd.Start() } diff --git a/client/fingerprint/env_aws.go b/client/fingerprint/env_aws.go index d47ee1ba8..839285a1d 100644 --- a/client/fingerprint/env_aws.go +++ b/client/fingerprint/env_aws.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/hashicorp/go-cleanhttp" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/nomad/structs" ) @@ -62,12 +63,12 @@ var ec2InstanceSpeedMap = map[string]int{ "d2.8xlarge": 10000, } -// EnvAWSFingerprint is used to fingerprint the CPU +// EnvAWSFingerprint is used to fingerprint AWS metadata type EnvAWSFingerprint struct { logger *log.Logger } -// NewEnvAWSFingerprint is used to create a CPU fingerprint +// NewEnvAWSFingerprint is used to create a fingerprint from AWS metadata func NewEnvAWSFingerprint(logger *log.Logger) Fingerprint { f := &EnvAWSFingerprint{logger: logger} return f @@ -93,7 +94,8 @@ func (f *EnvAWSFingerprint) Fingerprint(cfg *config.Config, node *structs.Node) // assume 2 seconds is enough time for inside AWS network client := &http.Client{ - Timeout: 2 * time.Second, + Timeout: 2 * time.Second, + Transport: cleanhttp.DefaultTransport(), } keys := []string{ @@ -164,7 +166,8 @@ func isAWS() bool { // assume 2 seconds is enough time for inside AWS network client := &http.Client{ - Timeout: 2 * time.Second, + Timeout: 2 * time.Second, + Transport: cleanhttp.DefaultTransport(), } // Query the metadata url for the ami-id, to veryify we're on AWS @@ -207,7 +210,8 @@ func (f *EnvAWSFingerprint) linkSpeed() int { // assume 2 seconds is enough time for inside AWS network client := &http.Client{ - Timeout: 2 * time.Second, + Timeout: 2 * time.Second, + Transport: cleanhttp.DefaultTransport(), } res, err := client.Get(metadataURL + "instance-type") diff --git a/client/fingerprint/env_gce.go b/client/fingerprint/env_gce.go new file mode 100644 index 000000000..f721fc36a --- /dev/null +++ b/client/fingerprint/env_gce.go @@ -0,0 +1,231 @@ +package fingerprint + +import ( + "encoding/json" + "io/ioutil" + "log" + "net/http" + "net/url" + "os" + "regexp" + "strconv" + "strings" + "time" + + "github.com/hashicorp/go-cleanhttp" + "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/nomad/structs" +) + +// This is where the GCE metadata server normally resides. We hardcode the +// "instance" path as well since it's the only one we access here. +const DEFAULT_GCE_URL = "http://169.254.169.254/computeMetadata/v1/instance/" + +type GCEMetadataNetworkInterface struct { + AccessConfigs []struct { + ExternalIp string + Type string + } + ForwardedIps []string + Ip string + Network string +} + +type ReqError struct { + StatusCode int +} + +func (e ReqError) Error() string { + return http.StatusText(e.StatusCode) +} + +func lastToken(s string) string { + index := strings.LastIndex(s, "/") + return s[index+1:] +} + +// EnvGCEFingerprint is used to fingerprint GCE metadata +type EnvGCEFingerprint struct { + client *http.Client + logger *log.Logger + metadataURL string +} + +// NewEnvGCEFingerprint is used to create a fingerprint from GCE metadata +func NewEnvGCEFingerprint(logger *log.Logger) Fingerprint { + // Read the internal metadata URL from the environment, allowing test files to + // provide their own + metadataURL := os.Getenv("GCE_ENV_URL") + if metadataURL == "" { + metadataURL = DEFAULT_GCE_URL + } + + // assume 2 seconds is enough time for inside GCE network + client := &http.Client{ + Timeout: 2 * time.Second, + Transport: cleanhttp.DefaultTransport(), + } + + return &EnvGCEFingerprint{ + client: client, + logger: logger, + metadataURL: metadataURL, + } +} + +func (f *EnvGCEFingerprint) Get(attribute string, recursive bool) (string, error) { + reqUrl := f.metadataURL + attribute + if recursive { + reqUrl = reqUrl + "?recursive=true" + } + + parsedUrl, err := url.Parse(reqUrl) + if err != nil { + return "", err + } + + req := &http.Request{ + Method: "GET", + URL: parsedUrl, + Header: http.Header{ + "Metadata-Flavor": []string{"Google"}, + }, + } + + res, err := f.client.Do(req) + if err != nil { + return "", err + } + + resp, err := ioutil.ReadAll(res.Body) + res.Body.Close() + if err != nil { + f.logger.Printf("[ERR]: fingerprint.env_gce: Error reading response body for GCE %s", attribute) + return "", err + } + + if res.StatusCode >= 400 { + return "", ReqError{res.StatusCode} + } + + return string(resp), nil +} + +func checkError(err error, logger *log.Logger, desc string) error { + // If it's a URL error, assume we're not actually in an GCE environment. + // To the outer layers, this isn't an error so return nil. + if _, ok := err.(*url.Error); ok { + logger.Printf("[ERR] fingerprint.env_gce: Error querying GCE " + desc + ", skipping") + return nil + } + // Otherwise pass the error through. + return err +} + +func (f *EnvGCEFingerprint) Fingerprint(cfg *config.Config, node *structs.Node) (bool, error) { + if !f.isGCE() { + return false, nil + } + + if node.Links == nil { + node.Links = make(map[string]string) + } + + keys := []string{ + "hostname", + "id", + "cpu-platform", + "scheduling/automatic-restart", + "scheduling/on-host-maintenance", + } + for _, k := range keys { + value, err := f.Get(k, false) + if err != nil { + return false, checkError(err, f.logger, k) + } + + // assume we want blank entries + key := strings.Replace(k, "/", ".", -1) + node.Attributes["platform.gce."+key] = strings.Trim(string(value), "\n") + } + + // These keys need everything before the final slash removed to be usable. + keys = []string{ + "machine-type", + "zone", + } + for _, k := range keys { + value, err := f.Get(k, false) + if err != nil { + return false, checkError(err, f.logger, k) + } + + node.Attributes["platform.gce."+k] = strings.Trim(lastToken(value), "\n") + } + + // Get internal and external IPs (if they exist) + value, err := f.Get("network-interfaces/", true) + var interfaces []GCEMetadataNetworkInterface + if err := json.Unmarshal([]byte(value), &interfaces); err != nil { + f.logger.Printf("[WARN] fingerprint.env_gce: Error decoding network interface information: %s", err.Error()) + } + + for _, intf := range interfaces { + prefix := "platform.gce.network." + lastToken(intf.Network) + node.Attributes[prefix] = "true" + node.Attributes[prefix+".ip"] = strings.Trim(intf.Ip, "\n") + for index, accessConfig := range intf.AccessConfigs { + node.Attributes[prefix+".external-ip."+strconv.Itoa(index)] = accessConfig.ExternalIp + } + } + + var tagList []string + value, err = f.Get("tags", false) + if err != nil { + return false, checkError(err, f.logger, "tags") + } + if err := json.Unmarshal([]byte(value), &tagList); err != nil { + f.logger.Printf("[WARN] fingerprint.env_gce: Error decoding instance tags: %s", err.Error()) + } + for _, tag := range tagList { + node.Attributes["platform.gce.tag."+tag] = "true" + } + + var attrDict map[string]string + value, err = f.Get("attributes/", true) + if err != nil { + return false, checkError(err, f.logger, "attributes/") + } + if err := json.Unmarshal([]byte(value), &attrDict); err != nil { + f.logger.Printf("[WARN] fingerprint.env_gce: Error decoding instance attributes: %s", err.Error()) + } + for k, v := range attrDict { + node.Attributes["platform.gce.attr."+k] = strings.Trim(v, "\n") + } + + // populate Links + node.Links["gce"] = node.Attributes["platform.gce.id"] + + return true, nil +} + +func (f *EnvGCEFingerprint) isGCE() bool { + // TODO: better way to detect GCE? + + // Query the metadata url for the machine type, to verify we're on GCE + machineType, err := f.Get("machine-type", false) + if err != nil { + if re, ok := err.(ReqError); !ok || re.StatusCode != 404 { + // If it wasn't a 404 error, print an error message. + f.logger.Printf("[ERR] fingerprint.env_gce: Error querying GCE Metadata URL, skipping") + } + return false + } + + match, err := regexp.MatchString("projects/.+/machineTypes/.+", machineType) + if !match { + return false + } + + return true +} diff --git a/client/fingerprint/env_gce_test.go b/client/fingerprint/env_gce_test.go new file mode 100644 index 000000000..159563576 --- /dev/null +++ b/client/fingerprint/env_gce_test.go @@ -0,0 +1,193 @@ +package fingerprint + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "os" + "testing" + + "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/nomad/structs" +) + +func TestGCEFingerprint_nonGCE(t *testing.T) { + os.Setenv("GCE_ENV_URL", "http://127.0.0.1/computeMetadata/v1/instance/") + f := NewEnvGCEFingerprint(testLogger()) + node := &structs.Node{ + Attributes: make(map[string]string), + } + + ok, err := f.Fingerprint(&config.Config{}, node) + if err != nil { + t.Fatalf("err: %v", err) + } + + if ok { + t.Fatalf("Should be false without test server") + } +} + +func testFingerprint_GCE(t *testing.T, withExternalIp bool) { + node := &structs.Node{ + Attributes: make(map[string]string), + } + + // configure mock server with fixture routes, data + routes := routes{} + if err := json.Unmarshal([]byte(GCE_routes), &routes); err != nil { + t.Fatalf("Failed to unmarshal JSON in GCE ENV test: %s", err) + } + networkEndpoint := &endpoint{ + Uri: "/computeMetadata/v1/instance/network-interfaces/?recursive=true", + ContentType: "application/json", + } + if withExternalIp { + networkEndpoint.Body = `[{"accessConfigs":[{"externalIp":"104.44.55.66","type":"ONE_TO_ONE_NAT"},{"externalIp":"104.44.55.67","type":"ONE_TO_ONE_NAT"}],"forwardedIps":[],"ip":"10.240.0.5","network":"projects/555555/networks/default"}]` + } else { + networkEndpoint.Body = `[{"accessConfigs":[],"forwardedIps":[],"ip":"10.240.0.5","network":"projects/555555/networks/default"}]` + } + routes.Endpoints = append(routes.Endpoints, networkEndpoint) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + value, ok := r.Header["Metadata-Flavor"] + if !ok { + t.Fatal("Metadata-Flavor not present in HTTP request header") + } + if value[0] != "Google" { + t.Fatalf("Expected Metadata-Flavor Google, saw %s", value[0]) + } + + found := false + for _, e := range routes.Endpoints { + if r.RequestURI == e.Uri { + w.Header().Set("Content-Type", e.ContentType) + fmt.Fprintln(w, e.Body) + } + found = true + } + + if !found { + w.WriteHeader(404) + } + })) + defer ts.Close() + os.Setenv("GCE_ENV_URL", ts.URL+"/computeMetadata/v1/instance/") + f := NewEnvGCEFingerprint(testLogger()) + + ok, err := f.Fingerprint(&config.Config{}, node) + if err != nil { + t.Fatalf("err: %v", err) + } + + if !ok { + t.Fatalf("should apply") + } + + keys := []string{ + "platform.gce.id", + "platform.gce.hostname", + "platform.gce.zone", + "platform.gce.machine-type", + "platform.gce.zone", + "platform.gce.tag.abc", + "platform.gce.tag.def", + "platform.gce.attr.ghi", + "platform.gce.attr.jkl", + } + + for _, k := range keys { + assertNodeAttributeContains(t, node, k) + } + + if len(node.Links) == 0 { + t.Fatalf("Empty links for Node in GCE Fingerprint test") + } + + // Make sure Links contains the GCE ID. + for _, k := range []string{"gce"} { + assertNodeLinksContains(t, node, k) + } + + assertNodeAttributeEquals(t, node, "platform.gce.id", "12345") + assertNodeAttributeEquals(t, node, "platform.gce.hostname", "instance-1.c.project.internal") + assertNodeAttributeEquals(t, node, "platform.gce.zone", "us-central1-f") + assertNodeAttributeEquals(t, node, "platform.gce.machine-type", "n1-standard-1") + assertNodeAttributeEquals(t, node, "platform.gce.network.default", "true") + assertNodeAttributeEquals(t, node, "platform.gce.network.default.ip", "10.240.0.5") + if withExternalIp { + assertNodeAttributeEquals(t, node, "platform.gce.network.default.external-ip.0", "104.44.55.66") + assertNodeAttributeEquals(t, node, "platform.gce.network.default.external-ip.1", "104.44.55.67") + } else if _, ok := node.Attributes["platform.gce.network.default.external-ip.0"]; ok { + t.Fatal("platform.gce.network.default.external-ip is set without an external IP") + } + + assertNodeAttributeEquals(t, node, "platform.gce.scheduling.automatic-restart", "TRUE") + assertNodeAttributeEquals(t, node, "platform.gce.scheduling.on-host-maintenance", "MIGRATE") + assertNodeAttributeEquals(t, node, "platform.gce.cpu-platform", "Intel Ivy Bridge") + assertNodeAttributeEquals(t, node, "platform.gce.tag.abc", "true") + assertNodeAttributeEquals(t, node, "platform.gce.tag.def", "true") + assertNodeAttributeEquals(t, node, "platform.gce.attr.ghi", "111") + assertNodeAttributeEquals(t, node, "platform.gce.attr.jkl", "222") +} + +const GCE_routes = ` +{ + "endpoints": [ + { + "uri": "/computeMetadata/v1/instance/id", + "content-type": "text/plain", + "body": "12345" + }, + { + "uri": "/computeMetadata/v1/instance/hostname", + "content-type": "text/plain", + "body": "instance-1.c.project.internal" + }, + { + "uri": "/computeMetadata/v1/instance/zone", + "content-type": "text/plain", + "body": "projects/555555/zones/us-central1-f" + }, + { + "uri": "/computeMetadata/v1/instance/machine-type", + "content-type": "text/plain", + "body": "projects/555555/machineTypes/n1-standard-1" + }, + { + "uri": "/computeMetadata/v1/instance/tags", + "content-type": "application/json", + "body": "[\"abc\", \"def\"]" + }, + { + "uri": "/computeMetadata/v1/instance/attributes/?recursive=true", + "content-type": "application/json", + "body": "{\"ghi\":\"111\",\"jkl\":\"222\"}" + }, + { + "uri": "/computeMetadata/v1/instance/scheduling/automatic-restart", + "content-type": "text/plain", + "body": "TRUE" + }, + { + "uri": "/computeMetadata/v1/instance/scheduling/on-host-maintenance", + "content-type": "text/plain", + "body": "MIGRATE" + }, + { + "uri": "/computeMetadata/v1/instance/cpu-platform", + "content-type": "text/plain", + "body": "Intel Ivy Bridge" + } + ] +} +` + +func TestFingerprint_GCEWithExternalIp(t *testing.T) { + testFingerprint_GCE(t, true) +} + +func TestFingerprint_GCEWithoutExternalIp(t *testing.T) { + testFingerprint_GCE(t, false) +} diff --git a/client/fingerprint/fingerprint.go b/client/fingerprint/fingerprint.go index ce69a8ac9..4a42057b2 100644 --- a/client/fingerprint/fingerprint.go +++ b/client/fingerprint/fingerprint.go @@ -18,6 +18,7 @@ var BuiltinFingerprints = []string{ "storage", "network", "env_aws", + "env_gce", } // builtinFingerprintMap contains the built in registered fingerprints @@ -30,6 +31,7 @@ var builtinFingerprintMap = map[string]Factory{ "storage": NewStorageFingerprint, "network": NewNetworkFingerprinter, "env_aws": NewEnvAWSFingerprint, + "env_gce": NewEnvGCEFingerprint, } // NewFingerprint is used to instantiate and return a new fingerprint diff --git a/client/fingerprint/host.go b/client/fingerprint/host.go index 5cbfee755..ac7a347f2 100644 --- a/client/fingerprint/host.go +++ b/client/fingerprint/host.go @@ -5,6 +5,7 @@ import ( "log" "os/exec" "runtime" + "strings" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/nomad/structs" @@ -40,7 +41,7 @@ func (f *HostFingerprint) Fingerprint(cfg *config.Config, node *structs.Node) (b if err != nil { return false, fmt.Errorf("Failed to run uname: %s", err) } - node.Attributes["kernel.version"] = string(out) + node.Attributes["kernel.version"] = strings.Trim(string(out), "\n") } node.Attributes["hostname"] = hostInfo.Hostname diff --git a/client/fingerprint/network_unix.go b/client/fingerprint/network_unix.go index 6a7925384..020c99516 100644 --- a/client/fingerprint/network_unix.go +++ b/client/fingerprint/network_unix.go @@ -33,25 +33,36 @@ func NewNetworkFingerprinter(logger *log.Logger) Fingerprint { func (f *NetworkFingerprint) Fingerprint(cfg *config.Config, node *structs.Node) (bool, error) { // newNetwork is populated and addded to the Nodes resources newNetwork := &structs.NetworkResource{} + defaultDevice := "" - // eth0 is the default device for Linux, and en0 is default for OS X - defaultDevice := "eth0" - if "darwin" == runtime.GOOS { - defaultDevice = "en0" - } - // User-defined override for the default interface + // 1. Use user-defined network device + // 2. Use first interface found in the system for non-dev mode. (dev mode uses lo by default.) if cfg.NetworkInterface != "" { defaultDevice = cfg.NetworkInterface + } else { + + intfs, err := net.Interfaces() + if err != nil { + return false, err + } + + for _, i := range intfs { + if (i.Flags&net.FlagUp != 0) && (i.Flags&(net.FlagLoopback|net.FlagPointToPoint) == 0) { + if ip := f.ipAddress(i.Name); ip != "" { + defaultDevice = i.Name + node.Attributes["network.ip-address"] = ip + newNetwork.IP = ip + newNetwork.CIDR = newNetwork.IP + "/32" + break + } + } + } } - newNetwork.Device = defaultDevice - - if ip := f.ipAddress(defaultDevice); ip != "" { - node.Attributes["network.ip-address"] = ip - newNetwork.IP = ip - newNetwork.CIDR = newNetwork.IP + "/32" + if defaultDevice != "" { + newNetwork.Device = defaultDevice } else { - return false, fmt.Errorf("Unable to determine IP on network interface %v", defaultDevice) + return false, fmt.Errorf("Unable to find any network interface which has IP address") } if throughput := f.linkSpeed(defaultDevice); throughput > 0 { diff --git a/client/testutil/driver_compatible.go b/client/testutil/driver_compatible.go index b4cdfba9f..df1d27d11 100644 --- a/client/testutil/driver_compatible.go +++ b/client/testutil/driver_compatible.go @@ -14,8 +14,13 @@ func ExecCompatible(t *testing.T) { } func QemuCompatible(t *testing.T) { - if runtime.GOOS != "windows" && syscall.Geteuid() != 0 { - t.Skip("Must be root on non-windows environments to run test") + if runtime.GOOS == "windows" { + t.Skip("Must be on non-windows environments to run test") + } + // else see if qemu exists + _, err := exec.Command("qemu-system-x86_64", "-version").CombinedOutput() + if err != nil { + t.Skip("Must have Qemu installed for Qemu specific tests to run") } } diff --git a/command/agent/config.go b/command/agent/config.go index 24f45f955..9b10f2348 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -220,6 +220,7 @@ func DevConfig() *Config { conf.DevMode = true conf.EnableDebug = true conf.DisableAnonymousSignature = true + conf.Client.NetworkInterface = "lo" return conf } diff --git a/command/node_status.go b/command/node_status.go index cf58de8d4..b3cba519c 100644 --- a/command/node_status.go +++ b/command/node_status.go @@ -2,6 +2,7 @@ package command import ( "fmt" + "sort" "strings" ) @@ -103,6 +104,20 @@ func (c *NodeStatusCommand) Run(args []string) int { return 1 } + m := node.Attributes + keys := make([]string, len(m)) + for k := range m { + keys = append(keys, k) + } + sort.Strings(keys) + + var attributes []string + for _, k := range keys { + if k != "" { + attributes = append(attributes, fmt.Sprintf("%s:%s", k, m[k])) + } + } + // Format the output basic := []string{ fmt.Sprintf("ID|%s", node.ID), @@ -111,6 +126,7 @@ func (c *NodeStatusCommand) Run(args []string) int { fmt.Sprintf("Datacenter|%s", node.Datacenter), fmt.Sprintf("Drain|%v", node.Drain), fmt.Sprintf("Status|%s", node.Status), + fmt.Sprintf("Attributes|%s", strings.Join(attributes, ", ")), } var allocs []string diff --git a/command/server_members.go b/command/server_members.go index 779de332d..2129f2375 100644 --- a/command/server_members.go +++ b/command/server_members.go @@ -2,6 +2,7 @@ package command import ( "fmt" + "sort" "strings" "github.com/hashicorp/nomad/api" @@ -68,6 +69,9 @@ func (c *ServerMembersCommand) Run(args []string) int { return 1 } + // Sort the members + sort.Sort(api.AgentMembersNameSort(mem)) + // Format the list var out []string if detailed { diff --git a/demo/vagrant/Vagrantfile b/demo/vagrant/Vagrantfile index 8dc432ef7..ee278c43b 100644 --- a/demo/vagrant/Vagrantfile +++ b/demo/vagrant/Vagrantfile @@ -4,7 +4,7 @@ $script = <