diff --git a/api/tasks.go b/api/tasks.go index e5ae46b5c..2990b5433 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -20,6 +20,28 @@ func NewRestartPolicy() *RestartPolicy { } } +// The ServiceCheck data model represents the consul health check that +// Nomad registers for a Task +type ServiceCheck struct { + Id string + Name string + Type string + Script string + Http string + Protocol string + Interval time.Duration + Timeout time.Duration +} + +// The Service model represents a Consul service defintion +type Service struct { + Id string + Name string + Tags []string + PortLabel string `mapstructure:"port"` + Checks []ServiceCheck +} + // TaskGroup is the unit of scheduling. type TaskGroup struct { Name string @@ -68,6 +90,7 @@ type Task struct { Config map[string]interface{} Constraints []*Constraint Env map[string]string + Services []Service Resources *Resources Meta map[string]string } diff --git a/command/init.go b/command/init.go index c911c95cf..92554ce45 100644 --- a/command/init.go +++ b/command/init.go @@ -128,6 +128,18 @@ job "example" { } } + service { + # name = redis + tags = ["global", "cache"] + port = "db" + check { + name = "alive" + type = "tcp" + interval = "10s" + timeout = "2s" + } + } + # We must specify the resources required for # this task to ensure it runs on a machine with # enough capacity. diff --git a/jobspec/parse.go b/jobspec/parse.go index 24772364f..92f3c5048 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -144,7 +144,7 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error { // If we have tasks outside, create TaskGroups for them if o := listVal.Filter("task"); len(o.Items) > 0 { var tasks []*structs.Task - if err := parseTasks(&tasks, o); err != nil { + if err := parseTasks(result.Name, "", &tasks, o); err != nil { return err } @@ -247,7 +247,7 @@ func parseGroups(result *structs.Job, list *ast.ObjectList) error { // Parse tasks if o := listVal.Filter("task"); len(o.Items) > 0 { - if err := parseTasks(&g.Tasks, o); err != nil { + if err := parseTasks(result.Name, g.Name, &g.Tasks, o); err != nil { return err } } @@ -346,7 +346,7 @@ func parseConstraints(result *[]*structs.Constraint, list *ast.ObjectList) error return nil } -func parseTasks(result *[]*structs.Task, list *ast.ObjectList) error { +func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, list *ast.ObjectList) error { list = list.Children() if len(list.Items) == 0 { return nil @@ -378,12 +378,16 @@ func parseTasks(result *[]*structs.Task, list *ast.ObjectList) error { delete(m, "config") delete(m, "env") delete(m, "constraint") + delete(m, "service") delete(m, "meta") delete(m, "resources") // Build the task var t structs.Task t.Name = n + if taskGroupName == "" { + taskGroupName = n + } if err := mapstructure.WeakDecode(m, &t); err != nil { return err } @@ -401,6 +405,12 @@ func parseTasks(result *[]*structs.Task, list *ast.ObjectList) error { } } + if o := listVal.Filter("service"); len(o.Items) > 0 { + if err := parseServices(jobName, taskGroupName, &t, o); err != nil { + return err + } + } + // If we have config, then parse that if o := listVal.Filter("config"); len(o.Items) > 0 { for _, o := range o.Elem().Items { @@ -452,6 +462,79 @@ func parseTasks(result *[]*structs.Task, list *ast.ObjectList) error { return nil } +func parseServices(jobName string, taskGroupName string, task *structs.Task, serviceObjs *ast.ObjectList) error { + task.Services = make([]structs.Service, len(serviceObjs.Items)) + var defaultServiceName bool + for idx, o := range serviceObjs.Items { + var service structs.Service + var m map[string]interface{} + if err := hcl.DecodeObject(&m, o.Val); err != nil { + return err + } + + delete(m, "check") + + if err := mapstructure.WeakDecode(m, &service); err != nil { + return err + } + + if defaultServiceName && service.Name == "" { + return fmt.Errorf("Only one service block may omit the Name field") + } + + if service.Name == "" { + defaultServiceName = true + service.Name = fmt.Sprintf("%s-%s-%s", jobName, taskGroupName, task.Name) + } else { + service.Name = fmt.Sprintf("%s-%s-%s-%s", jobName, taskGroupName, task.Name, service.Name) + } + + // Fileter checks + var checkList *ast.ObjectList + if ot, ok := o.Val.(*ast.ObjectType); ok { + checkList = ot.List + } else { + return fmt.Errorf("service '%s': should be an object", service.Name) + } + + if co := checkList.Filter("check"); len(co.Items) > 0 { + if err := parseChecks(&service, co); err != nil { + return err + } + } + + task.Services[idx] = service + } + + return nil +} + +func parseChecks(service *structs.Service, checkObjs *ast.ObjectList) error { + service.Checks = make([]structs.ServiceCheck, len(checkObjs.Items)) + for idx, co := range checkObjs.Items { + var check structs.ServiceCheck + var cm map[string]interface{} + if err := hcl.DecodeObject(&cm, co.Val); err != nil { + return err + } + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + WeaklyTypedInput: true, + Result: &check, + }) + if err != nil { + return err + } + if err := dec.Decode(cm); err != nil { + return err + } + + service.Checks[idx] = check + } + + return nil +} + func parseResources(result *structs.Resources, list *ast.ObjectList) error { list = list.Elem() if len(list.Items) == 0 { diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 125127de5..6eb19af11 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -94,6 +94,23 @@ func TestParse(t *testing.T) { Config: map[string]interface{}{ "image": "hashicorp/binstore", }, + Services: []structs.Service{ + { + Id: "", + Name: "binstore-storagelocker-binsl-binstore", + Tags: []string{"foo", "bar"}, + PortLabel: "http", + Checks: []structs.ServiceCheck{ + { + Id: "", + Name: "check-name", + Type: "tcp", + Interval: 10 * time.Second, + Timeout: 2 * time.Second, + }, + }, + }, + }, Env: map[string]string{ "HELLO": "world", "LOREM": "ipsum", @@ -301,7 +318,7 @@ func TestBadPorts(t *testing.T) { func TestOverlappingPorts(t *testing.T) { path, err := filepath.Abs(filepath.Join("./test-fixtures", "overlapping-ports.hcl")) if err != nil { - t.Fatalf("Can't get absoluate path for file: %s", err) + t.Fatalf("Can't get absolute path for file: %s", err) } _, err = ParseFile(path) @@ -314,3 +331,20 @@ func TestOverlappingPorts(t *testing.T) { t.Fatalf("Expected collision error; got %v", err) } } + +func TestIncompleteServiceDefn(t *testing.T) { + path, err := filepath.Abs(filepath.Join("./test-fixtures", "incorrect-service-def.hcl")) + if err != nil { + t.Fatalf("Can't get absolute path for file: %s", err) + } + + _, err = ParseFile(path) + + if err == nil { + t.Fatalf("Expected an error") + } + + if !strings.Contains(err.Error(), "Only one service block may omit the Name field") { + t.Fatalf("Expected collision error; got %v", err) + } +} diff --git a/jobspec/test-fixtures/basic.hcl b/jobspec/test-fixtures/basic.hcl index 236f4829a..9696fdef8 100644 --- a/jobspec/test-fixtures/basic.hcl +++ b/jobspec/test-fixtures/basic.hcl @@ -45,6 +45,16 @@ job "binstore-storagelocker" { HELLO = "world" LOREM = "ipsum" } + service { + tags = ["foo", "bar"] + port = "http" + check { + name = "check-name" + type = "tcp" + interval = "10s" + timeout = "2s" + } + } resources { cpu = 500 memory = 128 diff --git a/jobspec/test-fixtures/incorrect-service-def.hcl b/jobspec/test-fixtures/incorrect-service-def.hcl new file mode 100644 index 000000000..8a0029842 --- /dev/null +++ b/jobspec/test-fixtures/incorrect-service-def.hcl @@ -0,0 +1,77 @@ +job "binstore-storagelocker" { + region = "global" + type = "service" + priority = 50 + all_at_once = true + datacenters = ["us2", "eu1"] + + meta { + foo = "bar" + } + + constraint { + attribute = "kernel.os" + value = "windows" + } + + update { + stagger = "60s" + max_parallel = 2 + } + + task "outside" { + driver = "java" + config { + jar = "s3://my-cool-store/foo.jar" + } + meta { + my-cool-key = "foobar" + } + } + + group "binsl" { + count = 5 + restart { + attempts = 5 + interval = "10m" + delay = "15s" + } + task "binstore" { + driver = "docker" + config { + image = "hashicorp/binstore" + } + env { + HELLO = "world" + LOREM = "ipsum" + } + service { + tags = ["foo", "bar"] + port = "http" + check { + name = "check-name" + type = "http" + interval = "10s" + timeout = "2s" + } + } + service { + port = "one" + } + resources { + cpu = 500 + memory = 128 + + network { + mbits = "100" + port "one" { + static = 1 + } + port "three" { + static = 3 + } + port "http" {} + } + } + } +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 746b9b6c8..4a0f29f88 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -995,6 +995,60 @@ func (tg *TaskGroup) GoString() string { return fmt.Sprintf("*%#v", *tg) } +const ( + ServiceCheckHTTP = "http" + ServiceCheckTCP = "tcp" + ServiceCheckDocker = "docker" + ServiceCheckScript = "script" +) + +// The ServiceCheck data model represents the consul health check that +// Nomad registers for a Task +type ServiceCheck struct { + Id string // Id of the check, must be unique and it is autogenrated + Name string // Name of the check, defaults to id + Type string // Type of the check - tcp, http, docker and script + Script string // Script to invoke for script check + Http string // path of the health check url for http type check + Protocol string // Protocol to use if check is http, defaults to http + Interval time.Duration // Interval of the check + Timeout time.Duration // Timeout of the response from the check before consul fails the check +} + +func (sc *ServiceCheck) Validate() error { + t := strings.ToLower(sc.Type) + if sc.Type == ServiceCheckHTTP && sc.Http == "" { + return fmt.Errorf("http checks needs the Http path information.") + } + + if sc.Type == ServiceCheckScript && sc.Script == "" { + return fmt.Errorf("Script checks need the script to invoke") + } + if t != ServiceCheckTCP && t != ServiceCheckHTTP && t != ServiceCheckDocker && t != ServiceCheckScript { + return fmt.Errorf("Check with name %v has invalid check type: %s ", sc.Name, sc.Type) + } + return nil +} + +// The Service model represents a Consul service defintion +type Service struct { + Id string // Id of the service, this needs to be unique on a local machine + Name string // Name of the service, defaults to id + Tags []string // List of tags for the service + PortLabel string `mapstructure:"port"` // port for the service + Checks []ServiceCheck // List of checks associated with the service +} + +func (s *Service) Validate() error { + var mErr multierror.Error + for _, c := range s.Checks { + if err := c.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } + return mErr.ErrorOrNil() +} + // Task is a single process typically that is executed as part of a task group. type Task struct { // Name of the task @@ -1009,6 +1063,9 @@ type Task struct { // Map of environment variables to be used by the driver Env map[string]string + // List of service definitions exposed by the Task + Services []Service + // Constraints can be specified at a task level and apply only to // the particular task. Constraints []*Constraint @@ -1132,6 +1189,12 @@ func (t *Task) Validate() error { mErr.Errors = append(mErr.Errors, outer) } } + + for _, service := range t.Services { + if err := service.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } return mErr.ErrorOrNil() } diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 8221c40fd..84af2a198 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -357,9 +357,21 @@ func TestEncodeDecode(t *testing.T) { } } -func TestBatchRestartPolicyValidate(t *testing.T) { - rp := RestartPolicy{Attempts: 10, Delay: 25 * time.Second} - if err := rp.Validate(); err != nil { - t.Fatalf("err: %v", err) +func TestInvalidServiceCheck(t *testing.T) { + s := Service{ + Id: "service-id", + Name: "service-name", + PortLabel: "bar", + Checks: []ServiceCheck{ + { + + Id: "check-id", + Name: "check-name", + Type: "lol", + }, + }, + } + if err := s.Validate(); err == nil { + t.Fatalf("Service should be invalid") } }