diff --git a/CHANGELOG.md b/CHANGELOG.md index 428d401d5..b43855b93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ BACKWARDS INCOMPATIBILITIES: * core: Removed weight and hard/soft fields in constraints [GH-351] * drivers: Qemu and Java driver configurations have been updated to both use `artifact_source` as the source for external images/jars to be ran + * New reserved and dynamic port specification [GH-415] FEATURES: diff --git a/api/tasks.go b/api/tasks.go index 2990b5433..c378c222d 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -27,7 +27,7 @@ type ServiceCheck struct { Name string Type string Script string - Http string + Path string Protocol string Interval time.Duration Timeout time.Duration diff --git a/client/alloc_runner.go b/client/alloc_runner.go index d723478db..0bee8103a 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -33,9 +33,10 @@ type AllocStateUpdater func(alloc *structs.Allocation) error // AllocRunner is used to wrap an allocation and provide the execution context. type AllocRunner struct { - config *config.Config - updater AllocStateUpdater - logger *log.Logger + config *config.Config + updater AllocStateUpdater + logger *log.Logger + consulClient *ConsulClient alloc *structs.Allocation @@ -66,18 +67,20 @@ type allocRunnerState struct { } // NewAllocRunner is used to create a new allocation context -func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater, alloc *structs.Allocation) *AllocRunner { +func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater, + alloc *structs.Allocation, consulClient *ConsulClient) *AllocRunner { ar := &AllocRunner{ - config: config, - updater: updater, - logger: logger, - alloc: alloc, - dirtyCh: make(chan struct{}, 1), - tasks: make(map[string]*TaskRunner), - restored: make(map[string]struct{}), - updateCh: make(chan *structs.Allocation, 8), - destroyCh: make(chan struct{}), - waitCh: make(chan struct{}), + config: config, + updater: updater, + logger: logger, + alloc: alloc, + consulClient: consulClient, + dirtyCh: make(chan struct{}, 1), + tasks: make(map[string]*TaskRunner), + restored: make(map[string]struct{}), + updateCh: make(chan *structs.Allocation, 8), + destroyCh: make(chan struct{}), + waitCh: make(chan struct{}), } return ar } @@ -109,7 +112,8 @@ func (r *AllocRunner) RestoreState() error { task := &structs.Task{Name: name} restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy) tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, - r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker) + r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker, + r.consulClient) r.tasks[name] = tr // Skip tasks in terminal states. @@ -320,7 +324,8 @@ func (r *AllocRunner) Run() { task.Resources = alloc.TaskResources[task.Name] restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy) tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, - r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker) + r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker, + r.consulClient) r.tasks[task.Name] = tr go tr.Run() } diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 9abe6b8a2..bc4a7aa4f 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -31,12 +31,13 @@ func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) { conf.AllocDir = os.TempDir() upd := &MockAllocStateUpdater{} alloc := mock.Alloc() + consulClient, _ := NewConsulClient(logger, "127.0.0.1:8500") if !restarts { alloc.Job.Type = structs.JobTypeBatch *alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0} } - ar := NewAllocRunner(logger, conf, upd.Update, alloc) + ar := NewAllocRunner(logger, conf, upd.Update, alloc, consulClient) return upd, ar } @@ -64,7 +65,7 @@ func TestAllocRunner_Destroy(t *testing.T) { // Ensure task takes some time task := ar.alloc.Job.TaskGroups[0].Tasks[0] task.Config["command"] = "/bin/sleep" - task.Config["args"] = "10" + task.Config["args"] = []string{"10"} go ar.Run() start := time.Now() @@ -96,7 +97,7 @@ func TestAllocRunner_Update(t *testing.T) { // Ensure task takes some time task := ar.alloc.Job.TaskGroups[0].Tasks[0] task.Config["command"] = "/bin/sleep" - task.Config["args"] = "10" + task.Config["args"] = []string{"10"} go ar.Run() defer ar.Destroy() start := time.Now() @@ -129,7 +130,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { // Ensure task takes some time task := ar.alloc.Job.TaskGroups[0].Tasks[0] task.Config["command"] = "/bin/sleep" - task.Config["args"] = "10" + task.Config["args"] = []string{"10"} go ar.Run() defer ar.Destroy() @@ -141,8 +142,9 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { } // Create a new alloc runner + consulClient, err := NewConsulClient(ar.logger, "127.0.0.1:8500") ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update, - &structs.Allocation{ID: ar.alloc.ID}) + &structs.Allocation{ID: ar.alloc.ID}, consulClient) err = ar2.RestoreState() if err != nil { t.Fatalf("err: %v", err) diff --git a/client/client.go b/client/client.go index 1255e7d10..040d80bbf 100644 --- a/client/client.go +++ b/client/client.go @@ -70,6 +70,8 @@ type Client struct { logger *log.Logger + consulClient *ConsulClient + lastServer net.Addr lastRPCTime time.Time lastServerLock sync.Mutex @@ -96,14 +98,22 @@ func NewClient(cfg *config.Config) (*Client, error) { // Create a logger logger := log.New(cfg.LogOutput, "", log.LstdFlags) + // Create the consul client + consulAddr := cfg.ReadDefault("consul.address", "127.0.0.1:8500") + consulClient, err := NewConsulClient(logger, consulAddr) + if err != nil { + return nil, fmt.Errorf("failed to create the consul client: %v", err) + } + // Create the client c := &Client{ - config: cfg, - start: time.Now(), - connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil), - logger: logger, - allocs: make(map[string]*AllocRunner), - shutdownCh: make(chan struct{}), + config: cfg, + start: time.Now(), + consulClient: consulClient, + connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil), + logger: logger, + allocs: make(map[string]*AllocRunner), + shutdownCh: make(chan struct{}), } // Initialize the client @@ -136,6 +146,9 @@ func NewClient(cfg *config.Config) (*Client, error) { // Start the client! go c.run() + + // Start the consul client + go c.consulClient.SyncWithConsul() return c, nil } @@ -200,6 +213,9 @@ func (c *Client) Shutdown() error { } } + // Stop the consul client + c.consulClient.ShutDown() + c.shutdown = true close(c.shutdownCh) c.connPool.Shutdown() @@ -335,7 +351,7 @@ func (c *Client) restoreState() error { for _, entry := range list { id := entry.Name() alloc := &structs.Allocation{ID: id} - ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc) + ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulClient) c.allocs[id] = ar if err := ar.RestoreState(); err != nil { c.logger.Printf("[ERR] client: failed to restore state for alloc %s: %v", id, err) @@ -749,7 +765,7 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error { func (c *Client) addAlloc(alloc *structs.Allocation) error { c.allocLock.Lock() defer c.allocLock.Unlock() - ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc) + ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulClient) c.allocs[alloc.ID] = ar go ar.Run() return nil diff --git a/client/client_test.go b/client/client_test.go index 90fecad6b..935c63dea 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -336,7 +336,7 @@ func TestClient_SaveRestoreState(t *testing.T) { alloc1.NodeID = c1.Node().ID task := alloc1.Job.TaskGroups[0].Tasks[0] task.Config["command"] = "/bin/sleep" - task.Config["args"] = "10" + task.Config["args"] = []string{"10"} state := s1.State() err := state.UpsertAllocs(100, diff --git a/client/consul.go b/client/consul.go new file mode 100644 index 000000000..a87c9dad1 --- /dev/null +++ b/client/consul.go @@ -0,0 +1,201 @@ +package client + +import ( + "fmt" + consul "github.com/hashicorp/consul/api" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/nomad/structs" + "log" + "net/url" + "sync" + "time" +) + +const ( + syncInterval = 5 * time.Second +) + +type trackedService struct { + allocId string + task *structs.Task + service *structs.Service +} + +type ConsulClient struct { + client *consul.Client + logger *log.Logger + shutdownCh chan struct{} + + trackedServices map[string]*trackedService + trackedSrvLock sync.Mutex +} + +func NewConsulClient(logger *log.Logger, consulAddr string) (*ConsulClient, error) { + var err error + var c *consul.Client + cfg := consul.DefaultConfig() + cfg.Address = consulAddr + if c, err = consul.NewClient(cfg); err != nil { + return nil, err + } + + consulClient := ConsulClient{ + client: c, + logger: logger, + trackedServices: make(map[string]*trackedService), + shutdownCh: make(chan struct{}), + } + + return &consulClient, nil +} + +func (c *ConsulClient) Register(task *structs.Task, allocID string) error { + var mErr multierror.Error + for _, service := range task.Services { + c.logger.Printf("[INFO] Registering service %s with Consul.", service.Name) + if err := c.registerService(service, task, allocID); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + ts := &trackedService{ + allocId: allocID, + task: task, + service: service, + } + c.trackedSrvLock.Lock() + c.trackedServices[service.Id] = ts + c.trackedSrvLock.Unlock() + } + + return mErr.ErrorOrNil() +} + +func (c *ConsulClient) Deregister(task *structs.Task) error { + var mErr multierror.Error + for _, service := range task.Services { + c.logger.Printf("[INFO] De-Registering service %v with Consul", service.Name) + if err := c.deregisterService(service.Id); err != nil { + c.logger.Printf("[ERROR] Error in de-registering service %v from Consul", service.Name) + mErr.Errors = append(mErr.Errors, err) + } + c.trackedSrvLock.Lock() + delete(c.trackedServices, service.Id) + c.trackedSrvLock.Unlock() + } + return mErr.ErrorOrNil() +} + +func (c *ConsulClient) ShutDown() { + close(c.shutdownCh) +} + +func (c *ConsulClient) findPortAndHostForLabel(portLabel string, task *structs.Task) (string, int) { + for _, network := range task.Resources.Networks { + if p, ok := network.MapLabelToValues()[portLabel]; ok { + return network.IP, p + } + } + return "", 0 +} + +func (c *ConsulClient) SyncWithConsul() { + sync := time.After(syncInterval) + agent := c.client.Agent() + + for { + select { + case <-sync: + sync = time.After(syncInterval) + var consulServices map[string]*consul.AgentService + var err error + + // Get the list of the services that Consul knows about + if consulServices, err = agent.Services(); err != nil { + c.logger.Printf("[DEBUG] Error while syncing services with Consul: %v", err) + continue + } + + // See if we have services that Consul doesn't know about yet. + // Register with Consul the services which are not registered + for serviceId := range c.trackedServices { + if _, ok := consulServices[serviceId]; !ok { + ts := c.trackedServices[serviceId] + c.registerService(ts.service, ts.task, ts.allocId) + } + } + + // See if consul thinks we have some services which are not running + // anymore on the node. We de-register those services + for serviceId := range consulServices { + if serviceId == "consul" { + continue + } + if _, ok := c.trackedServices[serviceId]; !ok { + if err := c.deregisterService(serviceId); err != nil { + c.logger.Printf("[DEBUG] Error while de-registering service with ID: %s", serviceId) + } + } + } + case <-c.shutdownCh: + c.logger.Printf("[INFO] Shutting down Consul Client") + return + } + } +} + +func (c *ConsulClient) registerService(service *structs.Service, task *structs.Task, allocID string) error { + var mErr multierror.Error + service.Id = fmt.Sprintf("%s-%s", allocID, task.Name) + host, port := c.findPortAndHostForLabel(service.PortLabel, task) + if host == "" || port == 0 { + return fmt.Errorf("The port:%s marked for registration of service: %s couldn't be found", service.PortLabel, service.Name) + } + checks := c.makeChecks(service, host, port) + asr := &consul.AgentServiceRegistration{ + ID: service.Id, + Name: service.Name, + Tags: service.Tags, + Port: port, + Address: host, + Checks: checks, + } + if err := c.client.Agent().ServiceRegister(asr); err != nil { + c.logger.Printf("[ERROR] Error while registering service %v with Consul: %v", service.Name, err) + mErr.Errors = append(mErr.Errors, err) + } + return mErr.ErrorOrNil() +} + +func (c *ConsulClient) deregisterService(serviceId string) error { + if err := c.client.Agent().ServiceDeregister(serviceId); err != nil { + return err + } + return nil +} + +func (c *ConsulClient) makeChecks(service *structs.Service, ip string, port int) []*consul.AgentServiceCheck { + var checks []*consul.AgentServiceCheck + for _, check := range service.Checks { + c := &consul.AgentServiceCheck{ + Interval: check.Interval.String(), + Timeout: check.Timeout.String(), + } + switch check.Type { + case structs.ServiceCheckHTTP: + if check.Protocol == "" { + check.Protocol = "http" + } + url := url.URL{ + Scheme: check.Protocol, + Host: fmt.Sprintf("%s:%d", ip, port), + Path: check.Path, + } + c.HTTP = url.String() + case structs.ServiceCheckTCP: + c.TCP = fmt.Sprintf("%s:%d", ip, port) + case structs.ServiceCheckScript: + c.Script = check.Script // TODO This needs to include the path of the alloc dir and based on driver types + } + checks = append(checks, c) + } + return checks +} diff --git a/client/consul_test.go b/client/consul_test.go new file mode 100644 index 000000000..fb844859e --- /dev/null +++ b/client/consul_test.go @@ -0,0 +1,53 @@ +package client + +import ( + "github.com/hashicorp/nomad/nomad/structs" + "log" + "os" + "testing" + "time" +) + +func TestMakeChecks(t *testing.T) { + service := &structs.Service{ + Id: "Foo", + Name: "Bar", + Checks: []structs.ServiceCheck{ + { + Type: "http", + Path: "/foo/bar", + Interval: 10 * time.Second, + Timeout: 2 * time.Second, + }, + { + Type: "http", + Protocol: "https", + Path: "/foo/bar", + Interval: 10 * time.Second, + Timeout: 2 * time.Second, + }, + { + Type: "tcp", + Interval: 10 * time.Second, + Timeout: 2 * time.Second, + }, + }, + } + + logger := log.New(os.Stdout, "logger: ", log.Lshortfile) + + c, _ := NewConsulClient(logger, "") + checks := c.makeChecks(service, "10.10.0.1", 8090) + + if checks[0].HTTP != "http://10.10.0.1:8090/foo/bar" { + t.Fatalf("Invalid http url for check: %v", checks[0].HTTP) + } + + if checks[1].HTTP != "https://10.10.0.1:8090/foo/bar" { + t.Fatalf("Invalid http url for check: %v", checks[0].HTTP) + } + + if checks[2].TCP != "10.10.0.1:8090" { + t.Fatalf("Invalid tcp check: %v", checks[0].TCP) + } +} diff --git a/client/driver/args/args.go b/client/driver/args/args.go index 51793bd8b..9e8a9980c 100644 --- a/client/driver/args/args.go +++ b/client/driver/args/args.go @@ -1,11 +1,6 @@ package args -import ( - "fmt" - "regexp" - - "github.com/mattn/go-shellwords" -) +import "regexp" var ( envRe = regexp.MustCompile(`\$({[a-zA-Z0-9_]+}|[a-zA-Z0-9_]+)`) @@ -13,27 +8,17 @@ var ( // ParseAndReplace takes the user supplied args and a map of environment // variables. It replaces any instance of an environment variable in the args -// with the actual value and does correct splitting of the arg list. -func ParseAndReplace(args string, env map[string]string) ([]string, error) { - // Set up parser. - p := shellwords.NewParser() - p.ParseEnv = false - p.ParseBacktick = false - - parsed, err := p.Parse(args) - if err != nil { - return nil, fmt.Errorf("Couldn't parse args %v: %v", args, err) - } - - replaced := make([]string, len(parsed)) - for i, arg := range parsed { +// with the actual value. +func ParseAndReplace(args []string, env map[string]string) []string { + replaced := make([]string, len(args)) + for i, arg := range args { replaced[i] = ReplaceEnv(arg, env) } - return replaced, nil + return replaced } -// replaceEnv takes an arg and replaces all occurences of environment variables. +// ReplaceEnv takes an arg and replaces all occurences of environment variables. // If the variable is found in the passed map it is replaced, otherwise the // original string is returned. func ReplaceEnv(arg string, env map[string]string) string { diff --git a/client/driver/args/args_test.go b/client/driver/args/args_test.go index 4e1d88b59..5e7cbca4a 100644 --- a/client/driver/args/args_test.go +++ b/client/driver/args/args_test.go @@ -21,12 +21,9 @@ var ( ) func TestDriverArgs_ParseAndReplaceInvalidEnv(t *testing.T) { - input := "invalid $FOO" + input := []string{"invalid", "$FOO"} exp := []string{"invalid", "$FOO"} - act, err := ParseAndReplace(input, envVars) - if err != nil { - t.Fatalf("Failed to parse valid input args %v: %v", input, err) - } + act := ParseAndReplace(input, envVars) if !reflect.DeepEqual(act, exp) { t.Fatalf("ParseAndReplace(%v, %v) returned %#v; want %#v", input, envVars, act, exp) @@ -34,12 +31,9 @@ func TestDriverArgs_ParseAndReplaceInvalidEnv(t *testing.T) { } func TestDriverArgs_ParseAndReplaceValidEnv(t *testing.T) { - input := fmt.Sprintf("nomad_ip \\\"$%v\\\"!", ipKey) + input := []string{"nomad_ip", fmt.Sprintf(`"$%v"!`, ipKey)} exp := []string{"nomad_ip", fmt.Sprintf("\"%s\"!", ipVal)} - act, err := ParseAndReplace(input, envVars) - if err != nil { - t.Fatalf("Failed to parse valid input args %v: %v", input, err) - } + act := ParseAndReplace(input, envVars) if !reflect.DeepEqual(act, exp) { t.Fatalf("ParseAndReplace(%v, %v) returned %#v; want %#v", input, envVars, act, exp) @@ -47,32 +41,9 @@ func TestDriverArgs_ParseAndReplaceValidEnv(t *testing.T) { } func TestDriverArgs_ParseAndReplaceChainedEnv(t *testing.T) { - input := fmt.Sprintf("-foo $%s$%s", ipKey, portKey) + input := []string{"-foo", fmt.Sprintf("$%s$%s", ipKey, portKey)} exp := []string{"-foo", fmt.Sprintf("%s%s", ipVal, portVal)} - act, err := ParseAndReplace(input, envVars) - if err != nil { - t.Fatalf("Failed to parse valid input args %v: %v", input, err) - } - - if !reflect.DeepEqual(act, exp) { - t.Fatalf("ParseAndReplace(%v, %v) returned %#v; want %#v", input, envVars, act, exp) - } -} - -func TestDriverArgs_ParseAndReplaceInvalidArgEscape(t *testing.T) { - input := "-c \"echo \"foo\\\" > bar.txt\"" - if _, err := ParseAndReplace(input, envVars); err == nil { - t.Fatalf("ParseAndReplace(%v, %v) should have failed", input, envVars) - } -} - -func TestDriverArgs_ParseAndReplaceValidArgEscape(t *testing.T) { - input := "-c \"echo \\\"foo\\\" > bar.txt\"" - exp := []string{"-c", "echo \"foo\" > bar.txt"} - act, err := ParseAndReplace(input, envVars) - if err != nil { - t.Fatalf("Failed to parse valid input args %v: %v", input, err) - } + act := ParseAndReplace(input, envVars) if !reflect.DeepEqual(act, exp) { t.Fatalf("ParseAndReplace(%v, %v) returned %#v; want %#v", input, envVars, act, exp) diff --git a/client/driver/docker.go b/client/driver/docker.go index ab0ac96e0..4aa97e13b 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -35,7 +35,7 @@ type DockerDriverAuth struct { type DockerDriverConfig struct { ImageName string `mapstructure:"image"` // Container's Image Name Command string `mapstructure:"command"` // The Command/Entrypoint to run when the container starts up - Args string `mapstructure:"args"` // The arguments to the Command/Entrypoint + Args []string `mapstructure:"args"` // The arguments to the Command/Entrypoint NetworkMode string `mapstructure:"network_mode"` // The network mode of the container - host, net and none PortMap []map[string]int `mapstructure:"port_map"` // A map of host port labels and the ports exposed on the container Privileged bool `mapstructure:"privileged"` // Flag to run the container in priviledged mode @@ -293,21 +293,18 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, dri config.ExposedPorts = exposedPorts } - parsedArgs, err := args.ParseAndReplace(driverConfig.Args, env.Map()) - if err != nil { - return c, err - } + parsedArgs := args.ParseAndReplace(driverConfig.Args, env.Map()) // If the user specified a custom command to run as their entrypoint, we'll // inject it here. if driverConfig.Command != "" { cmd := []string{driverConfig.Command} - if driverConfig.Args != "" { + if len(driverConfig.Args) != 0 { cmd = append(cmd, parsedArgs...) } d.logger.Printf("[DEBUG] driver.docker: setting container startup command to: %s", strings.Join(cmd, " ")) config.Cmd = cmd - } else if driverConfig.Args != "" { + } else if len(driverConfig.Args) != 0 { d.logger.Println("[DEBUG] driver.docker: ignoring command arguments because command is not specified") } @@ -431,7 +428,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle if len(containers) != 1 { log.Printf("[ERR] driver.docker: failed to get id for container %s", config.Name) - return nil, fmt.Errorf("Failed to get id for container %s", config.Name, err) + return nil, fmt.Errorf("Failed to get id for container %s", config.Name) } log.Printf("[INFO] driver.docker: a container with the name %s already exists; will attempt to purge and re-create", config.Name) diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index 4c9300579..a61de381c 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -137,7 +137,7 @@ func TestDockerDriver_Start_Wait(t *testing.T) { Config: map[string]interface{}{ "image": "redis", "command": "redis-server", - "args": "-v", + "args": []string{"-v"}, }, Resources: &structs.Resources{ MemoryMB: 256, @@ -190,7 +190,11 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) { Config: map[string]interface{}{ "image": "redis", "command": "/bin/bash", - "args": fmt.Sprintf(`-c "sleep 1; echo -n %s > $%s/%s"`, string(exp), environment.AllocDir, file), + "args": []string{ + "-c", + fmt.Sprintf(`sleep 1; echo -n %s > $%s/%s`, + string(exp), environment.AllocDir, file), + }, }, Resources: &structs.Resources{ MemoryMB: 256, @@ -243,7 +247,7 @@ func TestDockerDriver_Start_Kill_Wait(t *testing.T) { Config: map[string]interface{}{ "image": "redis", "command": "/bin/sleep", - "args": "10", + "args": []string{"10"}, }, Resources: basicResources, } diff --git a/client/driver/exec.go b/client/driver/exec.go index 1f9e5b7d9..a0a136523 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -24,10 +24,10 @@ type ExecDriver struct { fingerprint.StaticFingerprinter } type ExecDriverConfig struct { - ArtifactSource string `mapstructure:"artifact_source"` - Checksum string `mapstructure:"checksum"` - Command string `mapstructure:"command"` - Args string `mapstructure:"args"` + ArtifactSource string `mapstructure:"artifact_source"` + Checksum string `mapstructure:"checksum"` + Command string `mapstructure:"command"` + Args []string `mapstructure:"args"` } // execHandle is returned from Start/Open as a handle to the PID @@ -91,14 +91,8 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, // Get the environment variables. envVars := TaskEnvironmentVariables(ctx, task) - // Look for arguments - var args []string - if driverConfig.Args != "" { - args = append(args, driverConfig.Args) - } - // Setup the command - cmd := executor.Command(command, args...) + cmd := executor.Command(command, driverConfig.Args...) if err := cmd.Limit(task.Resources); err != nil { return nil, fmt.Errorf("failed to constrain resources: %s", err) } diff --git a/client/driver/exec_test.go b/client/driver/exec_test.go index 7e6554e1d..f5470074a 100644 --- a/client/driver/exec_test.go +++ b/client/driver/exec_test.go @@ -39,7 +39,7 @@ func TestExecDriver_StartOpen_Wait(t *testing.T) { Name: "sleep", Config: map[string]interface{}{ "command": "/bin/sleep", - "args": "5", + "args": []string{"5"}, }, Resources: basicResources, } @@ -73,7 +73,7 @@ func TestExecDriver_Start_Wait(t *testing.T) { Name: "sleep", Config: map[string]interface{}{ "command": "/bin/sleep", - "args": "2", + "args": []string{"2"}, }, Resources: basicResources, } @@ -161,7 +161,10 @@ func TestExecDriver_Start_Artifact_expanded(t *testing.T) { Config: map[string]interface{}{ "artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s", file), "command": "/bin/bash", - "args": fmt.Sprintf("-c '/bin/sleep 1 && %s'", filepath.Join("$NOMAD_TASK_DIR", file)), + "args": []string{ + "-c", + fmt.Sprintf(`/bin/sleep 1 && %s`, filepath.Join("$NOMAD_TASK_DIR", file)), + }, }, Resources: basicResources, } @@ -204,7 +207,10 @@ func TestExecDriver_Start_Wait_AllocDir(t *testing.T) { Name: "sleep", Config: map[string]interface{}{ "command": "/bin/bash", - "args": fmt.Sprintf("-c \"sleep 1; echo -n %s > $%s/%s\"", string(exp), environment.AllocDir, file), + "args": []string{ + "-c", + fmt.Sprintf(`sleep 1; echo -n %s > $%s/%s`, string(exp), environment.AllocDir, file), + }, }, Resources: basicResources, } @@ -250,7 +256,7 @@ func TestExecDriver_Start_Kill_Wait(t *testing.T) { Name: "sleep", Config: map[string]interface{}{ "command": "/bin/sleep", - "args": "1", + "args": []string{"1"}, }, Resources: basicResources, } diff --git a/client/driver/executor/exec_basic.go b/client/driver/executor/exec_basic.go index 438ae6b92..2ef4fbedc 100644 --- a/client/driver/executor/exec_basic.go +++ b/client/driver/executor/exec_basic.go @@ -29,7 +29,6 @@ type BasicExecutor struct { allocDir string } -// TODO: Have raw_exec use this as well. func NewBasicExecutor() Executor { return &BasicExecutor{} } @@ -63,12 +62,7 @@ func (e *BasicExecutor) Start() error { } e.cmd.Path = args.ReplaceEnv(e.cmd.Path, envVars.Map()) - combined := strings.Join(e.cmd.Args, " ") - parsed, err := args.ParseAndReplace(combined, envVars.Map()) - if err != nil { - return err - } - e.cmd.Args = parsed + e.cmd.Args = args.ParseAndReplace(e.cmd.Args, envVars.Map()) spawnState := filepath.Join(e.allocDir, fmt.Sprintf("%s_%s", e.taskName, "exit_status")) e.spawn = spawn.NewSpawner(spawnState) diff --git a/client/driver/executor/exec_linux.go b/client/driver/executor/exec_linux.go index 14f4f809c..43c9270e0 100644 --- a/client/driver/executor/exec_linux.go +++ b/client/driver/executor/exec_linux.go @@ -167,12 +167,7 @@ func (e *LinuxExecutor) Start() error { } e.cmd.Path = args.ReplaceEnv(e.cmd.Path, envVars.Map()) - combined := strings.Join(e.cmd.Args, " ") - parsed, err := args.ParseAndReplace(combined, envVars.Map()) - if err != nil { - return err - } - e.cmd.Args = parsed + e.cmd.Args = args.ParseAndReplace(e.cmd.Args, envVars.Map()) spawnState := filepath.Join(e.allocDir, fmt.Sprintf("%s_%s", e.taskName, "exit_status")) e.spawn = spawn.NewSpawner(spawnState) diff --git a/client/driver/executor/test_harness.go b/client/driver/executor/test_harness_test.go similarity index 97% rename from client/driver/executor/test_harness.go rename to client/driver/executor/test_harness_test.go index 10cbac371..b5413700c 100644 --- a/client/driver/executor/test_harness.go +++ b/client/driver/executor/test_harness_test.go @@ -112,7 +112,7 @@ func Executor_Start_Wait(t *testing.T, command buildExecCommand) { expected := "hello world" file := filepath.Join(allocdir.TaskLocal, "output.txt") absFilePath := filepath.Join(taskDir, file) - cmd := fmt.Sprintf(`"%v \"%v\" > %v"`, "/bin/sleep 1 ; echo -n", expected, file) + cmd := fmt.Sprintf(`/bin/sleep 1 ; echo -n %v > %v`, expected, file) e := command("/bin/bash", "-c", cmd) if err := e.Limit(constraint); err != nil { @@ -190,7 +190,7 @@ func Executor_Open(t *testing.T, command buildExecCommand, newExecutor func() Ex expected := "hello world" file := filepath.Join(allocdir.TaskLocal, "output.txt") absFilePath := filepath.Join(taskDir, file) - cmd := fmt.Sprintf(`"%v \"%v\" > %v"`, "/bin/sleep 1 ; echo -n", expected, file) + cmd := fmt.Sprintf(`/bin/sleep 1 ; echo -n %v > %v`, expected, file) e := command("/bin/bash", "-c", cmd) if err := e.Limit(constraint); err != nil { diff --git a/client/driver/java.go b/client/driver/java.go index eb2930a28..eb475db32 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -28,10 +28,10 @@ type JavaDriver struct { } type JavaDriverConfig struct { - JvmOpts string `mapstructure:"jvm_options"` - ArtifactSource string `mapstructure:"artifact_source"` - Checksum string `mapstructure:"checksum"` - Args string `mapstructure:"args"` + JvmOpts []string `mapstructure:"jvm_options"` + ArtifactSource string `mapstructure:"artifact_source"` + Checksum string `mapstructure:"checksum"` + Args []string `mapstructure:"args"` } // javaHandle is returned from Start/Open as a handle to the PID @@ -126,15 +126,15 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, args := []string{} // Look for jvm options - if driverConfig.JvmOpts != "" { + if len(driverConfig.JvmOpts) != 0 { d.logger.Printf("[DEBUG] driver.java: found JVM options: %s", driverConfig.JvmOpts) - args = append(args, driverConfig.JvmOpts) + args = append(args, driverConfig.JvmOpts...) } // Build the argument list. args = append(args, "-jar", filepath.Join(allocdir.TaskLocal, jarName)) - if driverConfig.Args != "" { - args = append(args, driverConfig.Args) + if len(driverConfig.Args) != 0 { + args = append(args, driverConfig.Args...) } // Setup the command diff --git a/client/driver/java_test.go b/client/driver/java_test.go index a0c6d3b80..af6e44d6d 100644 --- a/client/driver/java_test.go +++ b/client/driver/java_test.go @@ -51,7 +51,7 @@ func TestJavaDriver_StartOpen_Wait(t *testing.T) { Name: "demo-app", Config: map[string]interface{}{ "artifact_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/demoapp.jar", - "jvm_options": "-Xmx2048m -Xms256m", + "jvm_options": []string{"-Xmx64m", "-Xms32m"}, "checksum": "sha256:58d6e8130308d32e197c5108edd4f56ddf1417408f743097c2e662df0f0b17c8", }, Resources: basicResources, @@ -97,7 +97,6 @@ func TestJavaDriver_Start_Wait(t *testing.T) { Name: "demo-app", Config: map[string]interface{}{ "artifact_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/demoapp.jar", - "jvm_options": "-Xmx2048m -Xms256m", "checksum": "sha256:58d6e8130308d32e197c5108edd4f56ddf1417408f743097c2e662df0f0b17c8", }, Resources: basicResources, diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index d5202fc39..5b1d98269 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -93,15 +93,9 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl // Get the environment variables. envVars := TaskEnvironmentVariables(ctx, task) - // Look for arguments - var args []string - if driverConfig.Args != "" { - args = append(args, driverConfig.Args) - } - // Setup the command cmd := executor.NewBasicExecutor() - executor.SetCommand(cmd, command, args) + executor.SetCommand(cmd, command, driverConfig.Args) if err := cmd.Limit(task.Resources); err != nil { return nil, fmt.Errorf("failed to constrain resources: %s", err) } diff --git a/client/driver/raw_exec_test.go b/client/driver/raw_exec_test.go index 1b7a0c8db..8b6e0c649 100644 --- a/client/driver/raw_exec_test.go +++ b/client/driver/raw_exec_test.go @@ -53,7 +53,7 @@ func TestRawExecDriver_StartOpen_Wait(t *testing.T) { Name: "sleep", Config: map[string]interface{}{ "command": "/bin/sleep", - "args": "1", + "args": []string{"1"}, }, Resources: basicResources, } @@ -151,7 +151,10 @@ func TestRawExecDriver_Start_Artifact_expanded(t *testing.T) { Config: map[string]interface{}{ "artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s", file), "command": "/bin/bash", - "args": fmt.Sprintf("-c '/bin/sleep 1 && %s'", filepath.Join("$NOMAD_TASK_DIR", file)), + "args": []string{ + "-c", + fmt.Sprintf(`'/bin/sleep 1 && %s'`, filepath.Join("$NOMAD_TASK_DIR", file)), + }, }, Resources: basicResources, } @@ -190,7 +193,7 @@ func TestRawExecDriver_Start_Wait(t *testing.T) { Name: "sleep", Config: map[string]interface{}{ "command": "/bin/sleep", - "args": "1", + "args": []string{"1"}, }, Resources: basicResources, } @@ -232,7 +235,10 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) { Name: "sleep", Config: map[string]interface{}{ "command": "/bin/bash", - "args": fmt.Sprintf(`-c "sleep 1; echo -n %s > $%s/%s"`, string(exp), environment.AllocDir, file), + "args": []string{ + "-c", + fmt.Sprintf(`sleep 1; echo -n %s > $%s/%s`, string(exp), environment.AllocDir, file), + }, }, Resources: basicResources, } @@ -277,7 +283,7 @@ func TestRawExecDriver_Start_Kill_Wait(t *testing.T) { Name: "sleep", Config: map[string]interface{}{ "command": "/bin/sleep", - "args": "1", + "args": []string{"1"}, }, Resources: basicResources, } diff --git a/client/driver/rkt.go b/client/driver/rkt.go index d09eac1db..1fb6e9e06 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -37,8 +37,8 @@ type RktDriver struct { } type RktDriverConfig struct { - ImageName string `mapstructure:"image"` - Args string `mapstructure:"args"` + ImageName string `mapstructure:"image"` + Args []string `mapstructure:"args"` } // rktHandle is returned from Start/Open as a handle to the PID @@ -150,11 +150,8 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e } // Add user passed arguments. - if driverConfig.Args != "" { - parsed, err := args.ParseAndReplace(driverConfig.Args, envVars.Map()) - if err != nil { - return nil, err - } + if len(driverConfig.Args) != 0 { + parsed := args.ParseAndReplace(driverConfig.Args, envVars.Map()) // Need to start arguments with "--" if len(parsed) > 0 { diff --git a/client/driver/rkt_test.go b/client/driver/rkt_test.go index a6b3dfb78..15db27b27 100644 --- a/client/driver/rkt_test.go +++ b/client/driver/rkt_test.go @@ -119,7 +119,7 @@ func TestRktDriver_Start_Wait(t *testing.T) { "trust_prefix": "coreos.com/etcd", "image": "coreos.com/etcd:v2.0.4", "command": "/etcd", - "args": "--version", + "args": []string{"--version"}, }, } @@ -160,7 +160,7 @@ func TestRktDriver_Start_Wait_Skip_Trust(t *testing.T) { Config: map[string]interface{}{ "image": "coreos.com/etcd:v2.0.4", "command": "/etcd", - "args": "--version", + "args": []string{"--version"}, }, } @@ -202,7 +202,7 @@ func TestRktDriver_Start_Wait_Logs(t *testing.T) { "trust_prefix": "coreos.com/etcd", "image": "coreos.com/etcd:v2.0.4", "command": "/etcd", - "args": "--version", + "args": []string{"--version"}, }, } diff --git a/client/task_runner.go b/client/task_runner.go index 6a4e6ed46..cc4bddab8 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -25,6 +25,7 @@ type TaskRunner struct { ctx *driver.ExecContext allocID string restartTracker restartTracker + consulClient *ConsulClient task *structs.Task state *structs.TaskState @@ -52,13 +53,14 @@ type TaskStateUpdater func(taskName string) func NewTaskRunner(logger *log.Logger, config *config.Config, updater TaskStateUpdater, ctx *driver.ExecContext, allocID string, task *structs.Task, state *structs.TaskState, - restartTracker restartTracker) *TaskRunner { + restartTracker restartTracker, consulClient *ConsulClient) *TaskRunner { tc := &TaskRunner{ config: config, updater: updater, logger: logger, restartTracker: restartTracker, + consulClient: consulClient, ctx: ctx, allocID: allocID, task: task, @@ -231,6 +233,12 @@ func (r *TaskRunner) run() { var destroyErr error destroyed := false + // Register the services defined by the task with Consil + r.consulClient.Register(r.task, r.allocID) + + // De-Register the services belonging to the task from consul + defer r.consulClient.Deregister(r.task) + OUTER: // Wait for updates for { @@ -303,6 +311,7 @@ func (r *TaskRunner) run() { // Set force start because we are restarting the task. forceStart = true } + return } diff --git a/client/task_runner_test.go b/client/task_runner_test.go index f38cf045a..1ada5060b 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -32,7 +32,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) { upd := &MockTaskStateUpdater{} alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] - + consulClient, _ := NewConsulClient(logger, "127.0.0.1:8500") // Initialize the port listing. This should be done by the offer process but // we have a mock so that doesn't happen. task.Resources.Networks[0].ReservedPorts = []structs.Port{{"", 80}} @@ -48,7 +48,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) { } state := alloc.TaskStates[task.Name] - tr := NewTaskRunner(logger, conf, upd.Update, ctx, alloc.ID, task, state, restartTracker) + tr := NewTaskRunner(logger, conf, upd.Update, ctx, alloc.ID, task, state, restartTracker, consulClient) return upd, tr } @@ -89,7 +89,7 @@ func TestTaskRunner_Destroy(t *testing.T) { // Change command to ensure we run for a bit tr.task.Config["command"] = "/bin/sleep" - tr.task.Config["args"] = "10" + tr.task.Config["args"] = []string{"10"} go tr.Run() // Begin the tear down @@ -128,7 +128,7 @@ func TestTaskRunner_Update(t *testing.T) { // Change command to ensure we run for a bit tr.task.Config["command"] = "/bin/sleep" - tr.task.Config["args"] = "10" + tr.task.Config["args"] = []string{"10"} go tr.Run() defer tr.Destroy() defer tr.ctx.AllocDir.Destroy() @@ -153,7 +153,7 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) { // Change command to ensure we run for a bit tr.task.Config["command"] = "/bin/sleep" - tr.task.Config["args"] = "10" + tr.task.Config["args"] = []string{"10"} go tr.Run() defer tr.Destroy() @@ -164,8 +164,10 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) { } // Create a new task runner + consulClient, _ := NewConsulClient(tr.logger, "127.0.0.1:8500") tr2 := NewTaskRunner(tr.logger, tr.config, upd.Update, - tr.ctx, tr.allocID, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker) + tr.ctx, tr.allocID, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker, + consulClient) if err := tr2.RestoreState(); err != nil { t.Fatalf("err: %v", err) } diff --git a/command/run.go b/command/run.go index 0ec0212e8..5b674b81b 100644 --- a/command/run.go +++ b/command/run.go @@ -124,6 +124,7 @@ func (c *RunCommand) Run(args []string) int { // This function is just a hammer and probably needs to be revisited. func convertJob(in *structs.Job) (*api.Job, error) { gob.Register([]map[string]interface{}{}) + gob.Register([]interface{}{}) var apiJob *api.Job buf := new(bytes.Buffer) if err := gob.NewEncoder(buf).Encode(in); err != nil { diff --git a/jobspec/parse.go b/jobspec/parse.go index 92f3c5048..58dbd6c9c 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -463,7 +463,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l } func parseServices(jobName string, taskGroupName string, task *structs.Task, serviceObjs *ast.ObjectList) error { - task.Services = make([]structs.Service, len(serviceObjs.Items)) + task.Services = make([]*structs.Service, len(serviceObjs.Items)) var defaultServiceName bool for idx, o := range serviceObjs.Items { var service structs.Service @@ -503,7 +503,7 @@ func parseServices(jobName string, taskGroupName string, task *structs.Task, ser } } - task.Services[idx] = service + task.Services[idx] = &service } return nil diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 6eb19af11..d452d1a70 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -94,7 +94,7 @@ func TestParse(t *testing.T) { Config: map[string]interface{}{ "image": "hashicorp/binstore", }, - Services: []structs.Service{ + Services: []*structs.Service{ { Id: "", Name: "binstore-storagelocker-binsl-binstore", diff --git a/nomad/fsm.go b/nomad/fsm.go index 21b3538e4..71c40d68f 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -13,13 +13,6 @@ import ( "github.com/hashicorp/raft" ) -var ( - msgpackHandle = &codec.MsgpackHandle{ - RawToString: true, - WriteExt: true, - } -) - const ( // timeTableGranularity is the granularity of index to time tracking timeTableGranularity = 5 * time.Minute @@ -328,7 +321,7 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { defer restore.Abort() // Create a decoder - dec := codec.NewDecoder(old, msgpackHandle) + dec := codec.NewDecoder(old, structs.MsgpackHandle) // Read in the header var header snapshotHeader @@ -412,7 +405,7 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error { defer metrics.MeasureSince([]string{"nomad", "fsm", "persist"}, time.Now()) // Register the nodes - encoder := codec.NewEncoder(sink, msgpackHandle) + encoder := codec.NewEncoder(sink, structs.MsgpackHandle) // Write the header header := snapshotHeader{} diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 12a8484c0..53f246700 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -86,7 +86,6 @@ func Job() *structs.Job { Driver: "exec", Config: map[string]interface{}{ "command": "/bin/date", - "args": "+%s", }, Env: map[string]string{ "FOO": "bar", @@ -151,7 +150,6 @@ func SystemJob() *structs.Job { Driver: "exec", Config: map[string]interface{}{ "command": "/bin/date", - "args": "+%s", }, Resources: &structs.Resources{ CPU: 500, diff --git a/nomad/rpc.go b/nomad/rpc.go index 123c35028..e52e258f0 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -11,7 +11,6 @@ import ( "time" "github.com/armon/go-metrics" - "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -53,24 +52,16 @@ const ( enqueueLimit = 30 * time.Second ) -var ( - // rpcHandle is the MsgpackHandle to be used by both Client and Server codecs. - rpcHandle = &codec.MsgpackHandle{ - // Enables proper encoding of strings within nil interfaces. - RawToString: true, - } -) - // NewClientCodec returns a new rpc.ClientCodec to be used to make RPC calls to // the Nomad Server. func NewClientCodec(conn io.ReadWriteCloser) rpc.ClientCodec { - return msgpackrpc.NewCodecFromHandle(true, true, conn, rpcHandle) + return msgpackrpc.NewCodecFromHandle(true, true, conn, structs.MsgpackHandle) } // NewServerCodec returns a new rpc.ServerCodec to be used by the Nomad Server // to handle rpcs. func NewServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec { - return msgpackrpc.NewCodecFromHandle(true, true, conn, rpcHandle) + return msgpackrpc.NewCodecFromHandle(true, true, conn, structs.MsgpackHandle) } // listen is used to listen for incoming RPC connections diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 4a0f29f88..a601fde29 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1009,7 +1009,7 @@ type ServiceCheck struct { Name string // Name of the check, defaults to id Type string // Type of the check - tcp, http, docker and script Script string // Script to invoke for script check - Http string // path of the health check url for http type check + Path string // path of the health check url for http type check Protocol string // Protocol to use if check is http, defaults to http Interval time.Duration // Interval of the check Timeout time.Duration // Timeout of the response from the check before consul fails the check @@ -1017,16 +1017,16 @@ type ServiceCheck struct { func (sc *ServiceCheck) Validate() error { t := strings.ToLower(sc.Type) - if sc.Type == ServiceCheckHTTP && sc.Http == "" { + if t != ServiceCheckTCP && t != ServiceCheckHTTP { + return fmt.Errorf("Check with name %v has invalid check type: %s ", sc.Name, sc.Type) + } + if sc.Type == ServiceCheckHTTP && sc.Path == "" { return fmt.Errorf("http checks needs the Http path information.") } if sc.Type == ServiceCheckScript && sc.Script == "" { return fmt.Errorf("Script checks need the script to invoke") } - if t != ServiceCheckTCP && t != ServiceCheckHTTP && t != ServiceCheckDocker && t != ServiceCheckScript { - return fmt.Errorf("Check with name %v has invalid check type: %s ", sc.Name, sc.Type) - } return nil } @@ -1064,7 +1064,7 @@ type Task struct { Env map[string]string // List of service definitions exposed by the Task - Services []Service + Services []*Service // Constraints can be specified at a task level and apply only to // the particular task. @@ -1704,7 +1704,7 @@ func (p *PlanResult) FullCommit(plan *Plan) (bool, int, int) { } // msgpackHandle is a shared handle for encoding/decoding of structs -var msgpackHandle = func() *codec.MsgpackHandle { +var MsgpackHandle = func() *codec.MsgpackHandle { h := &codec.MsgpackHandle{RawToString: true} // Sets the default type for decoding a map into a nil interface{}. @@ -1716,13 +1716,13 @@ var msgpackHandle = func() *codec.MsgpackHandle { // Decode is used to decode a MsgPack encoded object func Decode(buf []byte, out interface{}) error { - return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out) + return codec.NewDecoder(bytes.NewReader(buf), MsgpackHandle).Decode(out) } // Encode is used to encode a MsgPack object with type prefix func Encode(t MessageType, msg interface{}) ([]byte, error) { var buf bytes.Buffer buf.WriteByte(uint8(t)) - err := codec.NewEncoder(&buf, msgpackHandle).Encode(msg) + err := codec.NewEncoder(&buf, MsgpackHandle).Encode(msg) return buf.Bytes(), err } diff --git a/nomad/timetable_test.go b/nomad/timetable_test.go index 3a771d6fa..a76446a13 100644 --- a/nomad/timetable_test.go +++ b/nomad/timetable_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/nomad/structs" ) func TestTimeTable(t *testing.T) { @@ -104,14 +105,14 @@ func TestTimeTable_SerializeDeserialize(t *testing.T) { tt.Witness(50, plusHour) var buf bytes.Buffer - enc := codec.NewEncoder(&buf, msgpackHandle) + enc := codec.NewEncoder(&buf, structs.MsgpackHandle) err := tt.Serialize(enc) if err != nil { t.Fatalf("err: %v", err) } - dec := codec.NewDecoder(&buf, msgpackHandle) + dec := codec.NewDecoder(&buf, structs.MsgpackHandle) tt2 := NewTimeTable(time.Second, time.Minute) err = tt2.Deserialize(dec) diff --git a/scripts/deps.sh b/scripts/deps.sh index d2352b782..2c2899ff2 100755 --- a/scripts/deps.sh +++ b/scripts/deps.sh @@ -7,6 +7,8 @@ GOOS=linux go get $DEP_ARGS github.com/docker/docker/pkg/units GOOS=linux go get $DEP_ARGS github.com/docker/docker/pkg/mount GOOS=linux go get $DEP_ARGS github.com/opencontainers/runc/libcontainer/cgroups/fs GOOS=linux go get $DEP_ARGS github.com/opencontainers/runc/libcontainer/configs +GOOS=linux go get $DEP_ARGS github.com/coreos/go-systemd/util +GOOS=linux go get $DEP_ARGS github.com/coreos/go-systemd/dbus # Get the rest of the deps DEPS=$(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...) diff --git a/scripts/test.sh b/scripts/test.sh index 7071669a7..cbb2d016d 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -10,4 +10,4 @@ go build -o $TEMPDIR/nomad || exit 1 # Run the tests echo "--> Running tests" -go list ./... | PATH=$TEMPDIR:$PATH xargs -n1 go test -cover -timeout=40s +go list ./... | PATH=$TEMPDIR:$PATH xargs -n1 go test -cover -timeout=80s diff --git a/version.go b/version.go index 884805b31..80560f6db 100644 --- a/version.go +++ b/version.go @@ -5,9 +5,9 @@ var GitCommit string var GitDescribe string // The main version number that is being run at the moment. -const Version = "0.1.2" +const Version = "0.2.0" // A pre-release marker for the version. If this is "" (empty string) // then it means that it is a final release. Otherwise, this is a pre-release // such as "dev" (in development), "beta", "rc1", etc. -const VersionPrerelease = "" +const VersionPrerelease = "dev" diff --git a/website/helpers/command_helpers.rb b/website/helpers/command_helpers.rb index 45d437599..0718f2d48 100644 --- a/website/helpers/command_helpers.rb +++ b/website/helpers/command_helpers.rb @@ -2,8 +2,8 @@ module CommandHelpers # Returns the markdown text for the general options usage. def general_options_usage() <`: The address of the Nomad server. Overrides the "NOMAD_ADDR" - environment variable if set. Defaults to "http://127.0.0.1:4646". +* `-address=`: The address of the Nomad server. Overrides the `NOMAD_ADDR` + environment variable if set. Defaults to `http://127.0.0.1:4646`. EOF end end diff --git a/website/source/docs/drivers/docker.html.md b/website/source/docs/drivers/docker.html.md index 92cbab07b..79c884859 100644 --- a/website/source/docs/drivers/docker.html.md +++ b/website/source/docs/drivers/docker.html.md @@ -32,13 +32,13 @@ task "webservice" { The following options are available for use in the job specification. -* `image` - (Required) The Docker image to run. The image may include a tag or +* `image` - The Docker image to run. The image may include a tag or custom URL. By default it will be fetched from Docker Hub. * `command` - (Optional) The command to run when starting the container. -* `args` - (Optional) Arguments to the optional `command`. If no `command` is - present, `args` are ignored. +* `args` - (Optional) A list of arguments to the optional `command`. If no + `command` is present, `args` are ignored. * `labels` - (Optional) A key/value map of labels to set to the containers on start. diff --git a/website/source/docs/drivers/exec.html.md b/website/source/docs/drivers/exec.html.md index f897b1ea4..3d921aa6d 100644 --- a/website/source/docs/drivers/exec.html.md +++ b/website/source/docs/drivers/exec.html.md @@ -20,15 +20,19 @@ scripts or other wrappers which provide higher level features. The `exec` driver supports the following configuration in the job spec: -* `command` - (Required) The command to execute. Must be provided. -* `artifact_source` – (Optional) Source location of an executable artifact. Must be accessible -from the Nomad client. If you specify an `artifact_source` to be executed, you -must reference it in the `command` as show in the examples below -* `checksum` - **(Optional)** The checksum type and value for the `artifact_source` image. -The format is `type:value`, where type is any of `md5`, `sha1`, `sha256`, or `sha512`, -and the value is the computed checksum. If a checksum is supplied and does not -match the downloaded artifact, the driver will fail to start -* `args` - The argument list to the command, space seperated. Optional. +* `command` - The command to execute. Must be provided. + +* `artifact_source` – (Optional) Source location of an executable artifact. Must + be accessible from the Nomad client. If you specify an `artifact_source` to be + executed, you must reference it in the `command` as show in the examples below + +* `checksum` - (Optional) The checksum type and value for the `artifact_source` + image. The format is `type:value`, where type is any of `md5`, `sha1`, + `sha256`, or `sha512`, and the value is the computed checksum. If a checksum + is supplied and does not match the downloaded artifact, the driver will fail + to start + +* `args` - (Optional) A list of arguments to the `command`. ## Client Requirements diff --git a/website/source/docs/drivers/java.html.md b/website/source/docs/drivers/java.html.md index f2bbd2b76..45baacb72 100644 --- a/website/source/docs/drivers/java.html.md +++ b/website/source/docs/drivers/java.html.md @@ -18,17 +18,19 @@ HTTP from the Nomad client. The `java` driver supports the following configuration in the job spec: -* `artifact_source` - **(Required)** The hosted location of the source Jar file. Must be accessible -from the Nomad client -* `checksum` - **(Optional)** The checksum type and value for the `artifact_source` image. -The format is `type:value`, where type is any of `md5`, `sha1`, `sha256`, or `sha512`, -and the value is the computed checksum. If a checksum is supplied and does not -match the downloaded artifact, the driver will fail to start +* `artifact_source` - The hosted location of the source Jar file. Must be + accessible from the Nomad client -* `args` - **(Optional)** The argument list for the `java` command, space separated. +* `checksum` - (Optional) The checksum type and value for the `artifact_source` + image. The format is `type:value`, where type is any of `md5`, `sha1`, + `sha256`, or `sha512`, and the value is the computed checksum. If a checksum + is supplied and does not match the downloaded artifact, the driver will fail + to start -* `jvm_options` - **(Optional)** JVM options to be passed while invoking java. These options - are passed not validated in any way in Nomad. +* `args` - (Optional) A list of arguments to the `java` command. + +* `jvm_options` - (Optional) A list of JVM options to be passed while invoking + java. These options are passed not validated in any way in Nomad. ## Client Requirements diff --git a/website/source/docs/drivers/qemu.html.md b/website/source/docs/drivers/qemu.html.md index 84909e331..d329c7a8e 100644 --- a/website/source/docs/drivers/qemu.html.md +++ b/website/source/docs/drivers/qemu.html.md @@ -23,16 +23,19 @@ The `Qemu` driver can execute any regular `qemu` image (e.g. `qcow`, `img`, The `Qemu` driver supports the following configuration in the job spec: -* `artifact_source` - **(Required)** The hosted location of the source Qemu image. Must be accessible +* `artifact_source` - The hosted location of the source Qemu image. Must be accessible from the Nomad client, via HTTP. -* `checksum` - **(Optional)** The checksum type and value for the `artifact_source` image. + +* `checksum` - (Optional) The checksum type and value for the `artifact_source` image. The format is `type:value`, where type is any of `md5`, `sha1`, `sha256`, or `sha512`, and the value is the computed checksum. If a checksum is supplied and does not match the downloaded artifact, the driver will fail to start + * `accelerator` - (Optional) The type of accelerator to use in the invocation. If the host machine has `Qemu` installed with KVM support, users can specify `kvm` for the `accelerator`. Default is `tcg` -* `port_map` - **(Optional)** A `map[string]int` that maps port labels to ports + +* `port_map` - (Optional) A `map[string]int` that maps port labels to ports on the guest. This forwards the host port to the guest vm. For example, `port_map { db = 6539 }` would forward the host port with label `db` to the guest vm's port 6539. diff --git a/website/source/docs/drivers/raw_exec.html.md b/website/source/docs/drivers/raw_exec.html.md index 2dc741887..c76825f25 100644 --- a/website/source/docs/drivers/raw_exec.html.md +++ b/website/source/docs/drivers/raw_exec.html.md @@ -18,15 +18,19 @@ As such, it should be used with extreme care and is disabled by default. The `raw_exec` driver supports the following configuration in the job spec: -* `command` - (Required) The command to execute. Must be provided. -* `artifact_source` – (Optional) Source location of an executable artifact. Must be accessible -from the Nomad client. If you specify an `artifact_source` to be executed, you -must reference it in the `command` as show in the examples below -* `checksum` - **(Optional)** The checksum type and value for the `artifact_source` image. -The format is `type:value`, where type is any of `md5`, `sha1`, `sha256`, or `sha512`, -and the value is the computed checksum. If a checksum is supplied and does not -match the downloaded artifact, the driver will fail to start -* `args` - The argument list to the command, space seperated. Optional. +* `command` - The command to execute. Must be provided. + +* `artifact_source` – (Optional) Source location of an executable artifact. Must + be accessible from the Nomad client. If you specify an `artifact_source` to be + executed, you must reference it in the `command` as show in the examples below + +* `checksum` - (Optional) The checksum type and value for the `artifact_source` + image. The format is `type:value`, where type is any of `md5`, `sha1`, + `sha256`, or `sha512`, and the value is the computed checksum. If a checksum + is supplied and does not match the downloaded artifact, the driver will fail + to start + +* `args` - (Optional) A list of arguments to the `command`. ## Client Requirements diff --git a/website/source/docs/drivers/rkt.html.md b/website/source/docs/drivers/rkt.html.md index ddbb76601..7ce3e1a49 100644 --- a/website/source/docs/drivers/rkt.html.md +++ b/website/source/docs/drivers/rkt.html.md @@ -20,13 +20,16 @@ being marked as experimental and should be used with care. The `rkt` driver supports the following configuration in the job spec: -* `trust_prefix` - **(Optional)** The trust prefix to be passed to rkt. Must be reachable from -the box running the nomad agent. If not specified, the image is run without -verifying the image signature. -* `image` - **(Required)** The image to run which may be specified by name, -hash, ACI address or docker registry. -* `command` - **(Optional**) A command to execute on the ACI. -* `args` - **(Optional**) A string of args to pass into the image. +* `image` - The image to run which may be specified by name, hash, ACI address + or docker registry. + +* `command` - (Optional) A command to execute on the ACI. + +* `args` - (Optional) A list of arguments to the image. + +* `trust_prefix` - (Optional) The trust prefix to be passed to rkt. Must be + reachable from the box running the nomad agent. If not specified, the image is + run without verifying the image signature. ## Task Directories diff --git a/website/source/docs/internals/architecture.html.md b/website/source/docs/internals/architecture.html.md index 9c0079f1b..92102405a 100644 --- a/website/source/docs/internals/architecture.html.md +++ b/website/source/docs/internals/architecture.html.md @@ -64,8 +64,8 @@ clarify what is being discussed: Regions may contain multiple datacenters. Servers are assigned to regions and manage all state for the region and make scheduling decisions within that region. Requests that are made between regions are forwarded to the appropriate servers. As an example, you may - have a "US" region with the "us-east-1" and "us-west-1" datacenters, connected to the - "EU" region with the "eu-fr-1" and "eu-uk-1" datacenters. + have a `US` region with the `us-east-1` and `us-west-1` datacenters, connected to the + `EU` region with the `eu-fr-1` and `eu-uk-1` datacenters. * **Bin Packing** - Bin Packing is the process of filling bins with items in a way that maximizes the utilization of bins. This extends to Nomad, where the clients are "bins" diff --git a/website/source/docs/jobspec/index.html.md b/website/source/docs/jobspec/index.html.md index cffc04ed1..f34680de2 100644 --- a/website/source/docs/jobspec/index.html.md +++ b/website/source/docs/jobspec/index.html.md @@ -47,6 +47,15 @@ job "my-service" { config { image = "hashicorp/web-frontend" } + service { + port = "http" + check { + type = "http" + path = "/health" + interval = "10s" + timeout = "2s" + } + } env { DB_HOST = "db01.example.com" DB_USER = "web" @@ -57,10 +66,13 @@ job "my-service" { memory = 128 network { mbits = 100 - dynamic_ports = [ - "http", - "https", - ] + # Request for a dynamic port + port "http" { + } + # Request for a static port + port "https" { + static = 443 + } } } } @@ -169,7 +181,7 @@ The `task` object supports the following keys: * `driver` - Specifies the task driver that should be used to run the task. See the [driver documentation](/docs/drivers/index.html) for what - is available. Examples include "docker", "qemu", "java", and "exec". + is available. Examples include `docker`, `qemu`, `java`, and `exec`. * `constraint` - This can be provided multiple times to define additional constraints. See the constraint reference for more details. @@ -178,6 +190,11 @@ The `task` object supports the following keys: to start the task. The details of configurations are specific to each driver. +* `service` - Nomad integrates with Consul for Service Discovery. A service + block represents a routable and discoverable service on the network. Nomad + automatically registers when a Task is started and de-registers it when the + Task transitons to the DEAD state. To learn more about Services please visit [here](/docs/jobspec/servicediscovery.html.md) + * `env` - A map of key/value representing environment variables that will be passed along to the running process. @@ -223,14 +240,14 @@ The `restart` object supports the following keys: number of restarts allowed in an `interval` before a restart delay is added. * `interval` - `interval` is only valid on non-batch jobs and is a time duration - that can be specified using the "s", "m", and "h" suffixes, such as "30s". + that can be specified using the `s`, `m`, and `h` suffixes, such as `30s`. The `interval` begins when the first task starts and ensures that only `attempts` number of restarts happens within it. If more than `attempts` number of failures happen, the restart is delayed till after the `interval`, which is then reset. * `delay` - A duration to wait before restarting a task. It is specified as a - time duration using the "s", "m", and "h" suffixes, such as "30s". + time duration using the `s`, `m`, and `h` suffixes, such as `30s`. The default `batch` restart policy is: @@ -266,7 +283,7 @@ The `constraint` object supports the following keys: This can be a literal value or another attribute. * `version` - Specifies a version constraint against the attribute. - This sets the operator to "version" and the `value` to what is + This sets the operator to `version` and the `value` to what is specified. This supports a comma seperated list of constraints, including the pessimistic operator. See the [go-version](https://github.com/hashicorp/go-version) repository @@ -326,7 +343,7 @@ Below is a table documenting common node attributes: arch - CPU architecture of the client. Examples: "amd64", "386" + CPU architecture of the client. Examples: `amd64`, `386` consul.datacenter @@ -346,11 +363,11 @@ Below is a table documenting common node attributes: kernel.name - Kernel of the client. Examples: "linux", "darwin" + Kernel of the client. Examples: `linux`, `darwin` kernel.version - Version of the client kernel. Examples: "3.19.0-25-generic", "15.0.0" + Version of the client kernel. Examples: `3.19.0-25-generic`, `15.0.0` platform.aws.ami-id @@ -362,7 +379,7 @@ Below is a table documenting common node attributes: os.name - Operating system of the client. Examples: "ubuntu", "windows", "darwin" + Operating system of the client. Examples: `ubuntu`, `windows`, `darwin` os.version diff --git a/website/source/docs/jobspec/servicediscovery.html.md b/website/source/docs/jobspec/servicediscovery.html.md new file mode 100644 index 000000000..87245c136 --- /dev/null +++ b/website/source/docs/jobspec/servicediscovery.html.md @@ -0,0 +1,120 @@ +--- +layout: "docs" +page_title: "Service Discovery in Nomad" +sidebar_current: "docs-jobspec-service-discovery" +description: |- + Learn how to add service discovery to jobs +--- + +# Service Discovery + +Nomad schedules workloads of various types across a cluster of generic hosts. +Because of this, placement is not known in advance and you will need to use +service discovery to connect tasks to other services deployed across your +cluster. Nomad integrates with [Consul](https://consul.io) to provide service +discovery and monitoring. + +Note that in order to use Consul with Nomad, you will need to configure and +install Consul on your nodes alongside Nomad, or schedule it as a system job. +Nomad does not currently run Consul for you. + +## Configuration + +* `consul.address`: This is a Nomad client configuration which can be used to + override the default Consul Agent HTTP port that Nomad uses to connect to + Consul. The default for this is `127.0.0.1:8500`. + +## Service Definition Syntax + +The service blocks in a Task definition defines a service which Nomad will +register with Consul. Multiple Service blocks are allowed in a Task definition, +which allow registering multiple services for a task that exposes multiple +ports. + +### Example + +A brief example of a service definition in a Task + +``` +group "database" { + task "mysql" { + driver = "docker" + service { + tags = ["master", "mysql"] + port = "db" + check { + type = "tcp" + delay = "10s" + timeout = "2s" + } + } + resources { + cpu = 500 + memory = 1024 + network { + mbits = 10 + port "db" { + } + } + } + } +} +``` + +* `name`: Nomad automatically determines the name of a Task. By default the + name of a service is $(job-name)-$(task-group)-$(task-name). Users can + explicitly name the service by specifying this option. If multiple services + are defined for a Task then only one task can have the default name, all the + services have to be explicitly named. Nomad will add the prefix ```$(job-name + )-${task-group}-${task-name}``` prefix to each user defined name. + +* `tags`: A list of tags associated with this Service. + +* `port`: The port indicates the port associated with the Service. Users are + required to specify a valid port label here which they have defined in the + resources block. This could be a label to either a dynamic or a static port. + If an incorrect port label is specified, Nomad doesn't register the service + with Consul. + +* `check`: A check block defines a health check associated with the service. + Multiple check blocks are allowed for a service. Nomad currently supports + only the `http` and `tcp` Consul Checks. + +### Check Syntax + +* `type`: This indicates the check types supported by Nomad. Valid options are + currently `http` and `tcp`. In the future Nomad will add support for more + Consul checks. + +* `delay`: This indicates the frequency of the health checks that Consul with + perform. + +* `timeout`: This indicates how long Consul will wait for a health check query + to succeed. + +* `path`: The path of the http endpoint which Consul will query to query the + health of a service if the type of the check is `http`. Nomad will add the ip + of the service and the port, users are only required to add the relative url + of the health check endpoint. + +* `protocol`: This indicates the protocol for the http checks. Valid options + are `http` and `https`. We default it to `http` + +## Assumptions + +* Consul 0.6 is needed for using the TCP checks. + +* The Service Discovery feature in Nomad depends on Operators making sure that + the Nomad client can reach the consul agent. + +* Nomad assumes that it controls the life cycle of all the externally + discoverable services running on a host. + +* Tasks running inside Nomad also needs to reach out to the Consul agent if + they want to use any of the Consul APIs. Ex: A task running inside a docker + container in the bridge mode won't be able to talk to a Consul Agent running + on the loopback interface of the host since the container in the bridge mode + has it's own network interface and doesn't see interfaces on the global + network namespace of the host. There are a couple of ways to solve this, one + way is to run the container in the host networking mode, or make the Consul + agent listen on an interface on the network namespace of the container. diff --git a/website/source/layouts/docs.erb b/website/source/layouts/docs.erb index fa92a7072..8f4f52bd5 100644 --- a/website/source/layouts/docs.erb +++ b/website/source/layouts/docs.erb @@ -41,8 +41,8 @@ > Scheduler Types - > - Restart Policies + > + Service Discovery > Networking