diff --git a/api/services.go b/api/services.go new file mode 100644 index 000000000..47da17930 --- /dev/null +++ b/api/services.go @@ -0,0 +1,157 @@ +package api + +import ( + "fmt" + "time" +) + +// CheckRestart describes if and when a task should be restarted based on +// failing health checks. +type CheckRestart struct { + Limit int `mapstructure:"limit"` + Grace *time.Duration `mapstructure:"grace"` + IgnoreWarnings bool `mapstructure:"ignore_warnings"` +} + +// Canonicalize CheckRestart fields if not nil. +func (c *CheckRestart) Canonicalize() { + if c == nil { + return + } + + if c.Grace == nil { + c.Grace = timeToPtr(1 * time.Second) + } +} + +// Copy returns a copy of CheckRestart or nil if unset. +func (c *CheckRestart) Copy() *CheckRestart { + if c == nil { + return nil + } + + nc := new(CheckRestart) + nc.Limit = c.Limit + if c.Grace != nil { + g := *c.Grace + nc.Grace = &g + } + nc.IgnoreWarnings = c.IgnoreWarnings + return nc +} + +// Merge values from other CheckRestart over default values on this +// CheckRestart and return merged copy. +func (c *CheckRestart) Merge(o *CheckRestart) *CheckRestart { + if c == nil { + // Just return other + return o + } + + nc := c.Copy() + + if o == nil { + // Nothing to merge + return nc + } + + if o.Limit > 0 { + nc.Limit = o.Limit + } + + if o.Grace != nil { + nc.Grace = o.Grace + } + + if o.IgnoreWarnings { + nc.IgnoreWarnings = o.IgnoreWarnings + } + + return nc +} + +// ServiceCheck represents the consul health check that Nomad registers. +type ServiceCheck struct { + //FIXME Id is unused. Remove? + Id string + Name string + Type string + Command string + Args []string + Path string + Protocol string + PortLabel string `mapstructure:"port"` + AddressMode string `mapstructure:"address_mode"` + Interval time.Duration + Timeout time.Duration + InitialStatus string `mapstructure:"initial_status"` + TLSSkipVerify bool `mapstructure:"tls_skip_verify"` + Header map[string][]string + Method string + CheckRestart *CheckRestart `mapstructure:"check_restart"` + GRPCService string `mapstructure:"grpc_service"` + GRPCUseTLS bool `mapstructure:"grpc_use_tls"` +} + +// Service represents a Consul service definition. +type Service struct { + //FIXME Id is unused. Remove? + Id string + Name string + Tags []string + CanaryTags []string `mapstructure:"canary_tags"` + PortLabel string `mapstructure:"port"` + AddressMode string `mapstructure:"address_mode"` + Checks []ServiceCheck + CheckRestart *CheckRestart `mapstructure:"check_restart"` + Connect *ConsulConnect +} + +// Canonicalize the Service by ensuring its name and address mode are set. Task +// will be nil for group services. +func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) { + if s.Name == "" { + if t != nil { + s.Name = fmt.Sprintf("%s-%s-%s", *job.Name, *tg.Name, t.Name) + } else { + s.Name = fmt.Sprintf("%s-%s", *job.Name, *tg.Name) + } + } + + // Default to AddressModeAuto + if s.AddressMode == "" { + s.AddressMode = "auto" + } + + // Canonicalize CheckRestart on Checks and merge Service.CheckRestart + // into each check. + for i, check := range s.Checks { + s.Checks[i].CheckRestart = s.CheckRestart.Merge(check.CheckRestart) + s.Checks[i].CheckRestart.Canonicalize() + } +} + +// ConsulConnect represents a Consul Connect jobspec stanza. +type ConsulConnect struct { + Native bool + SidecarService *ConsulSidecarService `mapstructure:"sidecar_service"` +} + +// ConsulSidecarService represents a Consul Connect SidecarService jobspec +// stanza. +type ConsulSidecarService struct { + Port string + Proxy *ConsulProxy +} + +// ConsulProxy represents a Consul Connect sidecar proxy jobspec stanza. +type ConsulProxy struct { + Upstreams []*ConsulUpstream + Config map[string]interface{} +} + +// ConsulUpstream represents a Consul Connect upstream jobspec stanza. +type ConsulUpstream struct { + DestinationName string `mapstructure:"destination_name"` + LocalBindPort int `mapstructure:"local_bind_port"` +} diff --git a/api/tasks.go b/api/tasks.go index e627dae09..f401922e9 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -274,144 +274,6 @@ func (s *Spread) Canonicalize() { } } -// CheckRestart describes if and when a task should be restarted based on -// failing health checks. -type CheckRestart struct { - Limit int `mapstructure:"limit"` - Grace *time.Duration `mapstructure:"grace"` - IgnoreWarnings bool `mapstructure:"ignore_warnings"` -} - -// Canonicalize CheckRestart fields if not nil. -func (c *CheckRestart) Canonicalize() { - if c == nil { - return - } - - if c.Grace == nil { - c.Grace = timeToPtr(1 * time.Second) - } -} - -// Copy returns a copy of CheckRestart or nil if unset. -func (c *CheckRestart) Copy() *CheckRestart { - if c == nil { - return nil - } - - nc := new(CheckRestart) - nc.Limit = c.Limit - if c.Grace != nil { - g := *c.Grace - nc.Grace = &g - } - nc.IgnoreWarnings = c.IgnoreWarnings - return nc -} - -// Merge values from other CheckRestart over default values on this -// CheckRestart and return merged copy. -func (c *CheckRestart) Merge(o *CheckRestart) *CheckRestart { - if c == nil { - // Just return other - return o - } - - nc := c.Copy() - - if o == nil { - // Nothing to merge - return nc - } - - if o.Limit > 0 { - nc.Limit = o.Limit - } - - if o.Grace != nil { - nc.Grace = o.Grace - } - - if o.IgnoreWarnings { - nc.IgnoreWarnings = o.IgnoreWarnings - } - - return nc -} - -// The ServiceCheck data model represents the consul health check that -// Nomad registers for a Task -type ServiceCheck struct { - Id string - Name string - Type string - Command string - Args []string - Path string - Protocol string - PortLabel string `mapstructure:"port"` - AddressMode string `mapstructure:"address_mode"` - Interval time.Duration - Timeout time.Duration - InitialStatus string `mapstructure:"initial_status"` - TLSSkipVerify bool `mapstructure:"tls_skip_verify"` - Header map[string][]string - Method string - CheckRestart *CheckRestart `mapstructure:"check_restart"` - GRPCService string `mapstructure:"grpc_service"` - GRPCUseTLS bool `mapstructure:"grpc_use_tls"` -} - -// The Service model represents a Consul service definition -type Service struct { - Id string - Name string - Tags []string - CanaryTags []string `mapstructure:"canary_tags"` - PortLabel string `mapstructure:"port"` - AddressMode string `mapstructure:"address_mode"` - Checks []ServiceCheck - CheckRestart *CheckRestart `mapstructure:"check_restart"` - Connect *ConsulConnect -} - -func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) { - if s.Name == "" { - s.Name = fmt.Sprintf("%s-%s-%s", *job.Name, *tg.Name, t.Name) - } - - // Default to AddressModeAuto - if s.AddressMode == "" { - s.AddressMode = "auto" - } - - // Canonicalize CheckRestart on Checks and merge Service.CheckRestart - // into each check. - for i, check := range s.Checks { - s.Checks[i].CheckRestart = s.CheckRestart.Merge(check.CheckRestart) - s.Checks[i].CheckRestart.Canonicalize() - } -} - -type ConsulConnect struct { - SidecarService *ConsulSidecarService `mapstructure:"sidecar_service"` -} - -type ConsulSidecarService struct { - Port string - Proxy *ConsulProxy -} - -type ConsulProxy struct { - Upstreams []*ConsulUpstream -} - -type ConsulUpstream struct { - //FIXME Pointers? - DestinationName string `mapstructure:"destination_name"` - LocalBindPort int `mapstructure:"local_bind_port"` -} - // EphemeralDisk is an ephemeral disk object type EphemeralDisk struct { Sticky *bool @@ -629,6 +491,9 @@ func (g *TaskGroup) Canonicalize(job *Job) { for _, n := range g.Networks { n.Canonicalize() } + for _, s := range g.Services { + s.Canonicalize(nil, g, job) + } } // Constrain is used to add a constraint to a task group. diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index a42ce63de..89ed5bfa3 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -998,7 +998,9 @@ func ApiServicesToStructs(in []*api.Service) []*structs.Service { continue } - out[i].Connect = &structs.ConsulConnect{} + out[i].Connect = &structs.ConsulConnect{ + Native: s.Connect.Native, + } if s.Connect.SidecarService == nil { continue @@ -1012,7 +1014,9 @@ func ApiServicesToStructs(in []*api.Service) []*structs.Service { continue } - out[i].Connect.SidecarService.Proxy = &structs.ConsulProxy{} + out[i].Connect.SidecarService.Proxy = &structs.ConsulProxy{ + Config: s.Connect.SidecarService.Proxy.Config, + } upstreams := make([]*structs.ConsulUpstream, len(s.Connect.SidecarService.Proxy.Upstreams)) for i, p := range s.Connect.SidecarService.Proxy.Upstreams { diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index ba2912bbc..7fd5ab3f7 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -1492,10 +1492,43 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { ProgressDeadline: helper.TimeToPtr(5 * time.Minute), AutoRevert: helper.BoolToPtr(true), }, - Meta: map[string]string{ "key": "value", }, + Services: []*api.Service{ + { + Name: "groupserviceA", + Tags: []string{"a", "b"}, + CanaryTags: []string{"d", "e"}, + PortLabel: "1234", + CheckRestart: &api.CheckRestart{ + Limit: 4, + Grace: helper.TimeToPtr(11 * time.Second), + }, + Checks: []api.ServiceCheck{ + { + Id: "hello", + Name: "bar", + Type: "http", + Command: "foo", + Args: []string{"a", "b"}, + Path: "/check", + Protocol: "http", + PortLabel: "foo", + AddressMode: "driver", + GRPCService: "foo.Bar", + GRPCUseTLS: true, + Interval: 4 * time.Second, + Timeout: 2 * time.Second, + InitialStatus: "ok", + CheckRestart: &api.CheckRestart{ + Limit: 3, + IgnoreWarnings: true, + }, + }, + }, + }, + }, Tasks: []*api.Task{ { Name: "task1", @@ -1798,6 +1831,37 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { Meta: map[string]string{ "key": "value", }, + Services: []*structs.Service{ + { + Name: "groupserviceA", + Tags: []string{"a", "b"}, + CanaryTags: []string{"d", "e"}, + PortLabel: "1234", + AddressMode: "auto", + Checks: []*structs.ServiceCheck{ + { + Name: "bar", + Type: "http", + Command: "foo", + Args: []string{"a", "b"}, + Path: "/check", + Protocol: "http", + PortLabel: "foo", + AddressMode: "driver", + GRPCService: "foo.Bar", + GRPCUseTLS: true, + Interval: 4 * time.Second, + Timeout: 2 * time.Second, + InitialStatus: "ok", + CheckRestart: &structs.CheckRestart{ + Grace: 11 * time.Second, + Limit: 3, + IgnoreWarnings: true, + }, + }, + }, + }, + }, Tasks: []*structs.Task{ { Name: "task1", diff --git a/jobspec/parse.go b/jobspec/parse.go index c0a7dc608..f8d4aa341 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -947,9 +947,12 @@ func parseTasks(jobName string, taskGroupName string, result *[]*api.Task, list } if o := listVal.Filter("service"); len(o.Items) > 0 { - if err := parseServices(jobName, taskGroupName, &t, o); err != nil { + services, err := parseServices(jobName, taskGroupName, o) + if err != nil { return multierror.Prefix(err, fmt.Sprintf("'%s',", n)) } + + t.Services = services } // If we have config, then parse that @@ -1287,8 +1290,8 @@ func parseGroupServices(jobName string, taskGroupName string, g *api.TaskGroup, return nil } -func parseServices(jobName string, taskGroupName string, task *api.Task, serviceObjs *ast.ObjectList) error { - task.Services = make([]*api.Service, len(serviceObjs.Items)) +func parseServices(jobName string, taskGroupName string, serviceObjs *ast.ObjectList) ([]*api.Service, error) { + services := make([]*api.Service, len(serviceObjs.Items)) for idx, o := range serviceObjs.Items { // Check for invalid keys valid := []string{ @@ -1299,22 +1302,24 @@ func parseServices(jobName string, taskGroupName string, task *api.Task, service "check", "address_mode", "check_restart", + "connect", } if err := helper.CheckHCLKeys(o.Val, valid); err != nil { - return multierror.Prefix(err, fmt.Sprintf("service (%d) ->", idx)) + return nil, multierror.Prefix(err, fmt.Sprintf("service (%d) ->", idx)) } var service api.Service var m map[string]interface{} if err := hcl.DecodeObject(&m, o.Val); err != nil { - return err + return nil, err } delete(m, "check") delete(m, "check_restart") + delete(m, "connect") if err := mapstructure.WeakDecode(m, &service); err != nil { - return err + return nil, err } // Filter checks @@ -1322,33 +1327,236 @@ func parseServices(jobName string, taskGroupName string, task *api.Task, service if ot, ok := o.Val.(*ast.ObjectType); ok { checkList = ot.List } else { - return fmt.Errorf("service '%s': should be an object", service.Name) + return nil, fmt.Errorf("service '%s': should be an object", service.Name) } if co := checkList.Filter("check"); len(co.Items) > 0 { if err := parseChecks(&service, co); err != nil { - return multierror.Prefix(err, fmt.Sprintf("service: '%s',", service.Name)) + return nil, multierror.Prefix(err, fmt.Sprintf("service: '%s',", service.Name)) } } // Filter check_restart if cro := checkList.Filter("check_restart"); len(cro.Items) > 0 { if len(cro.Items) > 1 { - return fmt.Errorf("check_restart '%s': cannot have more than 1 check_restart", service.Name) + return nil, fmt.Errorf("check_restart '%s': cannot have more than 1 check_restart", service.Name) } if cr, err := parseCheckRestart(cro.Items[0]); err != nil { - return multierror.Prefix(err, fmt.Sprintf("service: '%s',", service.Name)) + return nil, multierror.Prefix(err, fmt.Sprintf("service: '%s',", service.Name)) } else { service.CheckRestart = cr } } - task.Services[idx] = &service + // Filter connect + if co := checkList.Filter("connect"); len(co.Items) > 0 { + if len(co.Items) > 1 { + return nil, fmt.Errorf("connect '%s': cannot have more than 1 connect stanza", service.Name) + } + + c, err := parseConnect(co.Items[0]) + if err != nil { + return nil, multierror.Prefix(err, fmt.Sprintf("service: '%s',", service.Name)) + } + + service.Connect = c + } + + services[idx] = &service } - return nil + return services, nil } +func parseConnect(co *ast.ObjectItem) (*api.ConsulConnect, error) { + valid := []string{ + "native", + "sidecar_service", + } + + if err := helper.CheckHCLKeys(co.Val, valid); err != nil { + return nil, multierror.Prefix(err, "connect ->") + } + + var connect api.ConsulConnect + var m map[string]interface{} + if err := hcl.DecodeObject(&m, co.Val); err != nil { + return nil, err + } + + delete(m, "sidecar_service") + + if err := mapstructure.WeakDecode(m, &connect); err != nil { + return nil, err + } + + var connectList *ast.ObjectList + if ot, ok := co.Val.(*ast.ObjectType); ok { + connectList = ot.List + } else { + return nil, fmt.Errorf("connect should be an object") + } + + // Parse the sidecar_service + o := connectList.Filter("sidecar_service") + if len(o.Items) == 0 { + return &connect, nil + } + if len(o.Items) > 1 { + return nil, fmt.Errorf("only one 'sidecar_service' block allowed per task") + } + + r, err := parseSidecarService(o.Items[0]) + if err != nil { + return nil, fmt.Errorf("sidecar_service, %v", err) + } + connect.SidecarService = r + + return &connect, nil +} + +func parseSidecarService(o *ast.ObjectItem) (*api.ConsulSidecarService, error) { + valid := []string{ + "port", + "proxy", + } + + if err := helper.CheckHCLKeys(o.Val, valid); err != nil { + return nil, multierror.Prefix(err, "sidecar_service ->") + } + + var sidecar api.ConsulSidecarService + var m map[string]interface{} + if err := hcl.DecodeObject(&m, o.Val); err != nil { + return nil, err + } + + delete(m, "proxy") + + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + WeaklyTypedInput: true, + Result: &sidecar, + }) + if err != nil { + return nil, err + } + if err := dec.Decode(m); err != nil { + return nil, fmt.Errorf("foo: %v", err) + } + + var proxyList *ast.ObjectList + if ot, ok := o.Val.(*ast.ObjectType); ok { + proxyList = ot.List + } else { + return nil, fmt.Errorf("sidecar_service: should be an object") + } + + // Parse the proxy + po := proxyList.Filter("proxy") + if len(po.Items) == 0 { + return &sidecar, nil + } + if len(po.Items) > 1 { + return nil, fmt.Errorf("only one 'proxy' block allowed per task") + } + + r, err := parseProxy(po.Items[0]) + if err != nil { + return nil, fmt.Errorf("proxy, %v", err) + } + sidecar.Proxy = r + + return &sidecar, nil +} + +func parseProxy(o *ast.ObjectItem) (*api.ConsulProxy, error) { + valid := []string{ + "upstreams", + "config", + } + + if err := helper.CheckHCLKeys(o.Val, valid); err != nil { + return nil, multierror.Prefix(err, "proxy ->") + } + + var proxy api.ConsulProxy + + var listVal *ast.ObjectList + if ot, ok := o.Val.(*ast.ObjectType); ok { + listVal = ot.List + } else { + return nil, fmt.Errorf("proxy: should be an object") + } + + // Parse the proxy + uo := listVal.Filter("upstreams") + proxy.Upstreams = make([]*api.ConsulUpstream, len(uo.Items)) + for i := range uo.Items { + u, err := parseUpstream(uo.Items[i]) + if err != nil { + return nil, err + } + + proxy.Upstreams[i] = u + } + + // If we have config, then parse that + if o := listVal.Filter("config"); len(o.Items) > 1 { + return nil, fmt.Errorf("only 1 meta object supported") + } else if len(o.Items) == 1 { + var mSlice []map[string]interface{} + if err := hcl.DecodeObject(&mSlice, o.Items[0].Val); err != nil { + return nil, err + } + + if len(mSlice) > 1 { + return nil, fmt.Errorf("only 1 meta object supported") + } + + m := mSlice[0] + + if err := mapstructure.WeakDecode(m, &proxy.Config); err != nil { + return nil, err + } + + proxy.Config = flattenMapSlice(proxy.Config) + } + + return &proxy, nil +} + +func parseUpstream(uo *ast.ObjectItem) (*api.ConsulUpstream, error) { + valid := []string{ + "destination_name", + "local_bind_port", + } + + if err := helper.CheckHCLKeys(uo.Val, valid); err != nil { + return nil, multierror.Prefix(err, "upstream ->") + } + + var upstream api.ConsulUpstream + var m map[string]interface{} + if err := hcl.DecodeObject(&m, uo.Val); err != nil { + return nil, err + } + + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + WeaklyTypedInput: true, + Result: &upstream, + }) + if err != nil { + return nil, err + } + + if err := dec.Decode(m); err != nil { + return nil, err + } + + return &upstream, nil +} func parseChecks(service *api.Service, checkObjs *ast.ObjectList) error { service.Checks = make([]api.ServiceCheck, len(checkObjs.Items)) for idx, co := range checkObjs.Items { @@ -1483,162 +1691,6 @@ func parseCheckRestart(cro *ast.ObjectItem) (*api.CheckRestart, error) { return &checkRestart, nil } -func parseConnect(co *ast.ObjectItem) (*api.ConsulConnect, error) { - valid := []string{ - "sidecar_service", - } - - if err := helper.CheckHCLKeys(co.Val, valid); err != nil { - return nil, multierror.Prefix(err, "connect ->") - } - - var connect api.ConsulConnect - - var connectList *ast.ObjectList - if ot, ok := co.Val.(*ast.ObjectType); ok { - connectList = ot.List - } else { - return nil, fmt.Errorf("connect should be an object") - } - - // Parse the sidecar_service - o := connectList.Filter("sidecar_service") - if len(o.Items) == 0 { - return nil, nil - } - if len(o.Items) > 1 { - return nil, fmt.Errorf("only one 'sidecar_service' block allowed per task") - } - - r, err := parseSidecarService(o.Items[0]) - if err != nil { - return nil, fmt.Errorf("sidecar_service, %v", err) - } - connect.SidecarService = r - - return &connect, nil -} - -func parseSidecarService(o *ast.ObjectItem) (*api.ConsulSidecarService, error) { - valid := []string{ - "port", - "proxy", - } - - if err := helper.CheckHCLKeys(o.Val, valid); err != nil { - return nil, multierror.Prefix(err, "sidecar_service ->") - } - - var sidecar api.ConsulSidecarService - var m map[string]interface{} - if err := hcl.DecodeObject(&m, o.Val); err != nil { - return nil, err - } - - delete(m, "proxy") - - dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ - DecodeHook: mapstructure.StringToTimeDurationHookFunc(), - WeaklyTypedInput: true, - Result: &sidecar, - }) - if err != nil { - return nil, err - } - if err := dec.Decode(m); err != nil { - return nil, fmt.Errorf("foo: %v", err) - } - - var proxyList *ast.ObjectList - if ot, ok := o.Val.(*ast.ObjectType); ok { - proxyList = ot.List - } else { - return nil, fmt.Errorf("sidecar_service: should be an object") - } - - // Parse the proxy - po := proxyList.Filter("proxy") - if len(po.Items) == 0 { - return &sidecar, nil - } - if len(po.Items) > 1 { - return nil, fmt.Errorf("only one 'proxy' block allowed per task") - } - - r, err := parseProxy(po.Items[0]) - if err != nil { - return nil, fmt.Errorf("proxy, %v", err) - } - sidecar.Proxy = r - - return &sidecar, nil -} - -func parseProxy(o *ast.ObjectItem) (*api.ConsulProxy, error) { - valid := []string{ - "upstreams", - } - - if err := helper.CheckHCLKeys(o.Val, valid); err != nil { - return nil, multierror.Prefix(err, "proxy ->") - } - - var proxy api.ConsulProxy - - var listVal *ast.ObjectList - if ot, ok := o.Val.(*ast.ObjectType); ok { - listVal = ot.List - } else { - return nil, fmt.Errorf("proxy: should be an object") - } - - // Parse the proxy - uo := listVal.Filter("upstreams") - proxy.Upstreams = make([]*api.ConsulUpstream, len(uo.Items)) - for i := range uo.Items { - u, err := parseUpstream(uo.Items[i]) - if err != nil { - return nil, err - } - - proxy.Upstreams[i] = u - } - - return &proxy, nil -} - -func parseUpstream(uo *ast.ObjectItem) (*api.ConsulUpstream, error) { - valid := []string{ - "destination_name", - "local_bind_port", - } - - if err := helper.CheckHCLKeys(uo.Val, valid); err != nil { - return nil, multierror.Prefix(err, "upstream ->") - } - - var upstream api.ConsulUpstream - var m map[string]interface{} - if err := hcl.DecodeObject(&m, uo.Val); err != nil { - return nil, err - } - - dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ - DecodeHook: mapstructure.StringToTimeDurationHookFunc(), - WeaklyTypedInput: true, - Result: &upstream, - }) - if err != nil { - return nil, err - } - - if err := dec.Decode(m); err != nil { - return nil, err - } - - return &upstream, nil -} - func parseResources(result *api.Resources, list *ast.ObjectList) error { list = list.Elem() if len(list.Items) == 0 { diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index a8c36f6a3..164da424b 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -893,6 +893,26 @@ func TestParse(t *testing.T) { }, }, }, + Services: []*api.Service{ + { + Name: "connect-service", + Tags: []string{"foo", "bar"}, + CanaryTags: []string{"canary", "bam"}, + PortLabel: "1234", + Connect: &api.ConsulConnect{ + SidecarService: &api.ConsulSidecarService{ + Proxy: &api.ConsulProxy{ + Upstreams: []*api.ConsulUpstream{ + { + DestinationName: "other-service", + LocalBindPort: 4567, + }, + }, + }, + }, + }, + }, + }, Tasks: []*api.Task{ { Name: "bar", diff --git a/jobspec/test-fixtures/tg-network.hcl b/jobspec/test-fixtures/tg-network.hcl index 9f921b44f..0abd67c95 100644 --- a/jobspec/test-fixtures/tg-network.hcl +++ b/jobspec/test-fixtures/tg-network.hcl @@ -9,6 +9,25 @@ job "foo" { to = 8080 } } + + service { + name = "connect-service" + tags = ["foo", "bar"] + canary_tags = ["canary", "bam"] + port = "1234" + + connect { + sidecar_service { + proxy { + upstreams { + destination_name = "other-service" + local_bind_port = 4567 + } + } + } + } + } + task "bar" { driver = "raw_exec" config { diff --git a/jobspec/utils.go b/jobspec/utils.go new file mode 100644 index 000000000..62de57aff --- /dev/null +++ b/jobspec/utils.go @@ -0,0 +1,34 @@ +package jobspec + +// flattenMapSlice flattens any occurrences of []map[string]interface{} into +// map[string]interface{}. +func flattenMapSlice(m map[string]interface{}) map[string]interface{} { + newM := make(map[string]interface{}, len(m)) + + for k, v := range m { + var newV interface{} + + switch mapV := v.(type) { + case []map[string]interface{}: + // Recurse into each map and flatten values + newMap := map[string]interface{}{} + for _, innerM := range mapV { + for innerK, innerV := range flattenMapSlice(innerM) { + newMap[innerK] = innerV + } + } + newV = newMap + + case map[string]interface{}: + // Recursively flatten maps + newV = flattenMapSlice(mapV) + + default: + newV = v + } + + newM[k] = newV + } + + return newM +} diff --git a/jobspec/utils_test.go b/jobspec/utils_test.go new file mode 100644 index 000000000..c571b7cba --- /dev/null +++ b/jobspec/utils_test.go @@ -0,0 +1,41 @@ +package jobspec + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +// TestFlattenMapSlice asserts flattenMapSlice recursively flattens a slice of maps into a +// single map. +func TestFlattenMapSlice(t *testing.T) { + t.Parallel() + + input := map[string]interface{}{ + "foo": 123, + "bar": []map[string]interface{}{ + { + "baz": 456, + }, + { + "baz": 789, + }, + { + "baax": true, + }, + }, + "nil": nil, + } + + output := map[string]interface{}{ + "foo": 123, + "bar": map[string]interface{}{ + "baz": 789, + "baax": true, + }, + "nil": nil, + } + + require.Equal(t, output, flattenMapSlice(input)) + +} diff --git a/nomad/structs/consul.go b/nomad/structs/consul.go deleted file mode 100644 index a184039d5..000000000 --- a/nomad/structs/consul.go +++ /dev/null @@ -1,109 +0,0 @@ -package structs - -type ConsulConnect struct { - SidecarService *ConsulSidecarService -} - -func (c *ConsulConnect) Copy() *ConsulConnect { - return &ConsulConnect{ - SidecarService: c.SidecarService.Copy(), - } -} - -func (c *ConsulConnect) Equals(o *ConsulConnect) bool { - if c == nil || o == nil { - return c == o - } - - return c.SidecarService.Equals(o.SidecarService) -} - -func (c *ConsulConnect) HasSidecar() bool { - return c != nil && c.SidecarService != nil -} - -type ConsulSidecarService struct { - Port string - Proxy *ConsulProxy -} - -func (s *ConsulSidecarService) Copy() *ConsulSidecarService { - return &ConsulSidecarService{ - Port: s.Port, - Proxy: s.Proxy.Copy(), - } -} - -func (s *ConsulSidecarService) Equals(o *ConsulSidecarService) bool { - if s == nil || o == nil { - return s == o - } - - if s.Port != o.Port { - return false - } - - return s.Proxy.Equals(o.Proxy) -} - -type ConsulProxy struct { - Upstreams []*ConsulUpstream -} - -func (p *ConsulProxy) Copy() *ConsulProxy { - upstreams := make([]*ConsulUpstream, len(p.Upstreams)) - - for i := range p.Upstreams { - upstreams[i] = p.Upstreams[i].Copy() - } - - return &ConsulProxy{ - Upstreams: upstreams, - } -} - -func (p *ConsulProxy) Equals(o *ConsulProxy) bool { - if p == nil || o == nil { - return p == o - } - - if len(p.Upstreams) != len(o.Upstreams) { - return false - } - - // Order doesn't matter -OUTER: - for _, up := range p.Upstreams { - for _, innerUp := range o.Upstreams { - if up.Equals(innerUp) { - // Match; find next upstream - continue OUTER - } - } - - // No match - return false - } - - return true -} - -type ConsulUpstream struct { - DestinationName string - LocalBindPort int -} - -func (u *ConsulUpstream) Copy() *ConsulUpstream { - return &ConsulUpstream{ - DestinationName: u.DestinationName, - LocalBindPort: u.LocalBindPort, - } -} - -func (u *ConsulUpstream) Equals(o *ConsulUpstream) bool { - if u == nil || o == nil { - return u == o - } - - return (*u) == (*o) -} diff --git a/nomad/structs/services.go b/nomad/structs/services.go new file mode 100644 index 000000000..abb53dddc --- /dev/null +++ b/nomad/structs/services.go @@ -0,0 +1,695 @@ +package structs + +import ( + "crypto/sha1" + "fmt" + "io" + "net/url" + "reflect" + "regexp" + "sort" + "strings" + "time" + + "github.com/hashicorp/consul/api" + multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/args" +) + +const ( + ServiceCheckHTTP = "http" + ServiceCheckTCP = "tcp" + ServiceCheckScript = "script" + ServiceCheckGRPC = "grpc" + + // minCheckInterval is the minimum check interval permitted. Consul + // currently has its MinInterval set to 1s. Mirror that here for + // consistency. + minCheckInterval = 1 * time.Second + + // minCheckTimeout is the minimum check timeout permitted for Consul + // script TTL checks. + minCheckTimeout = 1 * time.Second +) + +// ServiceCheck represents the Consul health check. +type ServiceCheck struct { + Name string // Name of the check, defaults to id + Type string // Type of the check - tcp, http, docker and script + Command string // Command is the command to run for script checks + Args []string // Args is a list of arguments for script checks + Path string // path of the health check url for http type check + Protocol string // Protocol to use if check is http, defaults to http + PortLabel string // The port to use for tcp/http checks + AddressMode string // 'host' to use host ip:port or 'driver' to use driver's + Interval time.Duration // Interval of the check + Timeout time.Duration // Timeout of the response from the check before consul fails the check + InitialStatus string // Initial status of the check + TLSSkipVerify bool // Skip TLS verification when Protocol=https + Method string // HTTP Method to use (GET by default) + Header map[string][]string // HTTP Headers for Consul to set when making HTTP checks + CheckRestart *CheckRestart // If and when a task should be restarted based on checks + GRPCService string // Service for GRPC checks + GRPCUseTLS bool // Whether or not to use TLS for GRPC checks +} + +// Copy the stanza recursively. Returns nil if nil. +func (sc *ServiceCheck) Copy() *ServiceCheck { + if sc == nil { + return nil + } + nsc := new(ServiceCheck) + *nsc = *sc + nsc.Args = helper.CopySliceString(sc.Args) + nsc.Header = helper.CopyMapStringSliceString(sc.Header) + nsc.CheckRestart = sc.CheckRestart.Copy() + return nsc +} + +// Equals returns true if the structs are recursively equal. +func (sc *ServiceCheck) Equals(o *ServiceCheck) bool { + if sc == nil || o == nil { + return sc == o + } + + if sc.Name != o.Name { + return false + } + + if sc.AddressMode != o.AddressMode { + return false + } + + if !helper.CompareSliceSetString(sc.Args, o.Args) { + return false + } + + if !sc.CheckRestart.Equals(o.CheckRestart) { + return false + } + + if sc.Command != o.Command { + return false + } + + if sc.GRPCService != o.GRPCService { + return false + } + + if sc.GRPCUseTLS != o.GRPCUseTLS { + return false + } + + // Use DeepEqual here as order of slice values could matter + if !reflect.DeepEqual(sc.Header, o.Header) { + return false + } + + if sc.InitialStatus != o.InitialStatus { + return false + } + + if sc.Interval != o.Interval { + return false + } + + if sc.Method != o.Method { + return false + } + + if sc.Path != o.Path { + return false + } + + if sc.PortLabel != o.Path { + return false + } + + if sc.Protocol != o.Protocol { + return false + } + + if sc.TLSSkipVerify != o.TLSSkipVerify { + return false + } + + if sc.Timeout != o.Timeout { + return false + } + + if sc.Type != o.Type { + return false + } + + return true +} + +func (sc *ServiceCheck) Canonicalize(serviceName string) { + // Ensure empty maps/slices are treated as null to avoid scheduling + // issues when using DeepEquals. + if len(sc.Args) == 0 { + sc.Args = nil + } + + if len(sc.Header) == 0 { + sc.Header = nil + } else { + for k, v := range sc.Header { + if len(v) == 0 { + sc.Header[k] = nil + } + } + } + + if sc.Name == "" { + sc.Name = fmt.Sprintf("service: %q check", serviceName) + } +} + +// validate a Service's ServiceCheck +func (sc *ServiceCheck) validate() error { + // Validate Type + switch strings.ToLower(sc.Type) { + case ServiceCheckGRPC: + case ServiceCheckTCP: + case ServiceCheckHTTP: + if sc.Path == "" { + return fmt.Errorf("http type must have a valid http path") + } + url, err := url.Parse(sc.Path) + if err != nil { + return fmt.Errorf("http type must have a valid http path") + } + if url.IsAbs() { + return fmt.Errorf("http type must have a relative http path") + } + + case ServiceCheckScript: + if sc.Command == "" { + return fmt.Errorf("script type must have a valid script path") + } + + default: + return fmt.Errorf(`invalid type (%+q), must be one of "http", "tcp", or "script" type`, sc.Type) + } + + // Validate interval and timeout + if sc.Interval == 0 { + return fmt.Errorf("missing required value interval. Interval cannot be less than %v", minCheckInterval) + } else if sc.Interval < minCheckInterval { + return fmt.Errorf("interval (%v) cannot be lower than %v", sc.Interval, minCheckInterval) + } + + if sc.Timeout == 0 { + return fmt.Errorf("missing required value timeout. Timeout cannot be less than %v", minCheckInterval) + } else if sc.Timeout < minCheckTimeout { + return fmt.Errorf("timeout (%v) is lower than required minimum timeout %v", sc.Timeout, minCheckInterval) + } + + // Validate InitialStatus + switch sc.InitialStatus { + case "": + case api.HealthPassing: + case api.HealthWarning: + case api.HealthCritical: + default: + return fmt.Errorf(`invalid initial check state (%s), must be one of %q, %q, %q or empty`, sc.InitialStatus, api.HealthPassing, api.HealthWarning, api.HealthCritical) + + } + + // Validate AddressMode + switch sc.AddressMode { + case "", AddressModeHost, AddressModeDriver: + // Ok + case AddressModeAuto: + return fmt.Errorf("invalid address_mode %q - %s only valid for services", sc.AddressMode, AddressModeAuto) + default: + return fmt.Errorf("invalid address_mode %q", sc.AddressMode) + } + + return sc.CheckRestart.Validate() +} + +// RequiresPort returns whether the service check requires the task has a port. +func (sc *ServiceCheck) RequiresPort() bool { + switch sc.Type { + case ServiceCheckGRPC, ServiceCheckHTTP, ServiceCheckTCP: + return true + default: + return false + } +} + +// TriggersRestarts returns true if this check should be watched and trigger a restart +// on failure. +func (sc *ServiceCheck) TriggersRestarts() bool { + return sc.CheckRestart != nil && sc.CheckRestart.Limit > 0 +} + +// Hash all ServiceCheck fields and the check's corresponding service ID to +// create an identifier. The identifier is not guaranteed to be unique as if +// the PortLabel is blank, the Service's PortLabel will be used after Hash is +// called. +func (sc *ServiceCheck) Hash(serviceID string) string { + h := sha1.New() + io.WriteString(h, serviceID) + io.WriteString(h, sc.Name) + io.WriteString(h, sc.Type) + io.WriteString(h, sc.Command) + io.WriteString(h, strings.Join(sc.Args, "")) + io.WriteString(h, sc.Path) + io.WriteString(h, sc.Protocol) + io.WriteString(h, sc.PortLabel) + io.WriteString(h, sc.Interval.String()) + io.WriteString(h, sc.Timeout.String()) + io.WriteString(h, sc.Method) + // Only include TLSSkipVerify if set to maintain ID stability with Nomad <0.6 + if sc.TLSSkipVerify { + io.WriteString(h, "true") + } + + // Since map iteration order isn't stable we need to write k/v pairs to + // a slice and sort it before hashing. + if len(sc.Header) > 0 { + headers := make([]string, 0, len(sc.Header)) + for k, v := range sc.Header { + headers = append(headers, k+strings.Join(v, "")) + } + sort.Strings(headers) + io.WriteString(h, strings.Join(headers, "")) + } + + // Only include AddressMode if set to maintain ID stability with Nomad <0.7.1 + if len(sc.AddressMode) > 0 { + io.WriteString(h, sc.AddressMode) + } + + // Only include GRPC if set to maintain ID stability with Nomad <0.8.4 + if sc.GRPCService != "" { + io.WriteString(h, sc.GRPCService) + } + if sc.GRPCUseTLS { + io.WriteString(h, "true") + } + + return fmt.Sprintf("%x", h.Sum(nil)) +} + +const ( + AddressModeAuto = "auto" + AddressModeHost = "host" + AddressModeDriver = "driver" +) + +// Service represents a Consul service definition +type Service struct { + // Name of the service registered with Consul. Consul defaults the + // Name to ServiceID if not specified. The Name if specified is used + // as one of the seed values when generating a Consul ServiceID. + Name string + + // PortLabel is either the numeric port number or the `host:port`. + // To specify the port number using the host's Consul Advertise + // address, specify an empty host in the PortLabel (e.g. `:port`). + PortLabel string + + // AddressMode specifies whether or not to use the host ip:port for + // this service. + AddressMode string + + Tags []string // List of tags for the service + CanaryTags []string // List of tags for the service when it is a canary + Checks []*ServiceCheck // List of checks associated with the service + Connect *ConsulConnect // Consul Connect configuration +} + +// Copy the stanza recursively. Returns nil if nil. +func (s *Service) Copy() *Service { + if s == nil { + return nil + } + ns := new(Service) + *ns = *s + ns.Tags = helper.CopySliceString(ns.Tags) + ns.CanaryTags = helper.CopySliceString(ns.CanaryTags) + + if s.Checks != nil { + checks := make([]*ServiceCheck, len(ns.Checks)) + for i, c := range ns.Checks { + checks[i] = c.Copy() + } + ns.Checks = checks + } + + ns.Connect = s.Connect.Copy() + + return ns +} + +// Canonicalize interpolates values of Job, Task Group and Task in the Service +// Name. This also generates check names, service id and check ids. +func (s *Service) Canonicalize(job string, taskGroup string, task string) { + // Ensure empty lists are treated as null to avoid scheduler issues when + // using DeepEquals + if len(s.Tags) == 0 { + s.Tags = nil + } + if len(s.CanaryTags) == 0 { + s.CanaryTags = nil + } + if len(s.Checks) == 0 { + s.Checks = nil + } + + s.Name = args.ReplaceEnv(s.Name, map[string]string{ + "JOB": job, + "TASKGROUP": taskGroup, + "TASK": task, + "BASE": fmt.Sprintf("%s-%s-%s", job, taskGroup, task), + }, + ) + + for _, check := range s.Checks { + check.Canonicalize(s.Name) + } +} + +// Validate checks if the Check definition is valid +func (s *Service) Validate() error { + var mErr multierror.Error + + // Ensure the service name is valid per the below RFCs but make an exception + // for our interpolation syntax by first stripping any environment variables from the name + + serviceNameStripped := args.ReplaceEnvWithPlaceHolder(s.Name, "ENV-VAR") + + if err := s.ValidateName(serviceNameStripped); err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("service name must be valid per RFC 1123 and can contain only alphanumeric characters or dashes: %q", s.Name)) + } + + switch s.AddressMode { + case "", AddressModeAuto, AddressModeHost, AddressModeDriver: + // OK + default: + mErr.Errors = append(mErr.Errors, fmt.Errorf("service address_mode must be %q, %q, or %q; not %q", AddressModeAuto, AddressModeHost, AddressModeDriver, s.AddressMode)) + } + + for _, c := range s.Checks { + if s.PortLabel == "" && c.PortLabel == "" && c.RequiresPort() { + mErr.Errors = append(mErr.Errors, fmt.Errorf("check %s invalid: check requires a port but neither check nor service %+q have a port", c.Name, s.Name)) + continue + } + + if err := c.validate(); err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("check %s invalid: %v", c.Name, err)) + } + } + + if s.Connect != nil { + if err := s.Connect.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } + + return mErr.ErrorOrNil() +} + +// ValidateName checks if the services Name is valid and should be called after +// the name has been interpolated +func (s *Service) ValidateName(name string) error { + // Ensure the service name is valid per RFC-952 §1 + // (https://tools.ietf.org/html/rfc952), RFC-1123 §2.1 + // (https://tools.ietf.org/html/rfc1123), and RFC-2782 + // (https://tools.ietf.org/html/rfc2782). + re := regexp.MustCompile(`^(?i:[a-z0-9]|[a-z0-9][a-z0-9\-]{0,61}[a-z0-9])$`) + if !re.MatchString(name) { + return fmt.Errorf("service name must be valid per RFC 1123 and can contain only alphanumeric characters or dashes and must be no longer than 63 characters: %q", name) + } + return nil +} + +// Hash returns a base32 encoded hash of a Service's contents excluding checks +// as they're hashed independently. +func (s *Service) Hash(allocID, taskName string, canary bool) string { + h := sha1.New() + io.WriteString(h, allocID) + io.WriteString(h, taskName) + io.WriteString(h, s.Name) + io.WriteString(h, s.PortLabel) + io.WriteString(h, s.AddressMode) + for _, tag := range s.Tags { + io.WriteString(h, tag) + } + for _, tag := range s.CanaryTags { + io.WriteString(h, tag) + } + + // Vary ID on whether or not CanaryTags will be used + if canary { + h.Write([]byte("Canary")) + } + + // Base32 is used for encoding the hash as sha1 hashes can always be + // encoded without padding, only 4 bytes larger than base64, and saves + // 8 bytes vs hex. Since these hashes are used in Consul URLs it's nice + // to have a reasonably compact URL-safe representation. + return b32.EncodeToString(h.Sum(nil)) +} + +// Equals returns true if the structs are recursively equal. +func (s *Service) Equals(o *Service) bool { + if s == nil || o == nil { + return s == o + } + + if s.AddressMode != o.AddressMode { + return false + } + + if !helper.CompareSliceSetString(s.CanaryTags, o.CanaryTags) { + return false + } + + if len(s.Checks) != len(o.Checks) { + return false + } + +OUTER: + for i := range s.Checks { + for ii := range o.Checks { + if s.Checks[i].Equals(o.Checks[ii]) { + // Found match; continue with next check + continue OUTER + } + } + + // No match + return false + } + + if !s.Connect.Equals(o.Connect) { + return false + } + + if s.Name != o.Name { + return false + } + + if s.PortLabel != o.PortLabel { + return false + } + + if !helper.CompareSliceSetString(s.Tags, o.Tags) { + return false + } + + return true +} + +// ConsulConnect represents a Consul Connect jobspec stanza. +type ConsulConnect struct { + // Native is true if a service implements Connect directly and does not + // need a sidecar. + Native bool + + // SidecarService is non-nil if a service requires a sidecar. + SidecarService *ConsulSidecarService +} + +// Copy the stanza recursively. Returns nil if nil. +func (c *ConsulConnect) Copy() *ConsulConnect { + if c == nil { + return nil + } + + return &ConsulConnect{ + Native: c.Native, + SidecarService: c.SidecarService.Copy(), + } +} + +// Equals returns true if the structs are recursively equal. +func (c *ConsulConnect) Equals(o *ConsulConnect) bool { + if c == nil || o == nil { + return c == o + } + + if c.Native != o.Native { + return false + } + + return c.SidecarService.Equals(o.SidecarService) +} + +// Validate that the Connect stanza has exactly one of Native or sidecar. +func (c *ConsulConnect) Validate() error { + if c == nil { + return nil + } + + if c.Native && c.SidecarService != nil { + return fmt.Errorf("Consul Connect must be native or use a sidecar service; not both") + } + + if !c.Native && c.SidecarService == nil { + return fmt.Errorf("Consul Connect must be native or use a sidecar service") + } + + return nil +} + +// ConsulSidecarService represents a Consul Connect SidecarService jobspec +// stanza. +type ConsulSidecarService struct { + // Port is the service's port that the sidecar will connect to. May be + // a port label or a literal port number. + Port string + + // Proxy stanza defining the sidecar proxy configuration. + Proxy *ConsulProxy +} + +// Copy the stanza recursively. Returns nil if nil. +func (s *ConsulSidecarService) Copy() *ConsulSidecarService { + return &ConsulSidecarService{ + Port: s.Port, + Proxy: s.Proxy.Copy(), + } +} + +// Equals returns true if the structs are recursively equal. +func (s *ConsulSidecarService) Equals(o *ConsulSidecarService) bool { + if s == nil || o == nil { + return s == o + } + + if s.Port != o.Port { + return false + } + + return s.Proxy.Equals(o.Proxy) +} + +// ConsulProxy represents a Consul Connect sidecar proxy jobspec stanza. +type ConsulProxy struct { + // Upstreams configures the upstream services this service intends to + // connect to. + Upstreams []*ConsulUpstream + + // Config is a proxy configuration. It is opaque to Nomad and passed + // directly to Consul. + Config map[string]interface{} +} + +// Copy the stanza recursively. Returns nil if nil. +func (p *ConsulProxy) Copy() *ConsulProxy { + if p == nil { + return nil + } + + newP := ConsulProxy{} + + if n := len(p.Upstreams); n > 0 { + newP.Upstreams = make([]*ConsulUpstream, n) + + for i := range p.Upstreams { + newP.Upstreams[i] = p.Upstreams[i].Copy() + } + } + + if n := len(p.Config); n > 0 { + newP.Config = make(map[string]interface{}, n) + + for k, v := range p.Config { + newP.Config[k] = v + } + } + + return &newP +} + +// Equals returns true if the structs are recursively equal. +func (p *ConsulProxy) Equals(o *ConsulProxy) bool { + if p == nil || o == nil { + return p == o + } + + if len(p.Upstreams) != len(o.Upstreams) { + return false + } + + // Order doesn't matter +OUTER: + for _, up := range p.Upstreams { + for _, innerUp := range o.Upstreams { + if up.Equals(innerUp) { + // Match; find next upstream + continue OUTER + } + } + + // No match + return false + } + + // Avoid nil vs {} differences + if len(p.Config) != 0 && len(o.Config) != 0 { + if !reflect.DeepEqual(p.Config, o.Config) { + return false + } + } + + return true +} + +// ConsulUpstream represents a Consul Connect upstream jobspec stanza. +type ConsulUpstream struct { + // DestinationName is the name of the upstream service. + DestinationName string + + // LocalBindPort is the port the proxy will receive connections for the + // upstream on. + LocalBindPort int +} + +// Copy the stanza recursively. Returns nil if nil. +func (u *ConsulUpstream) Copy() *ConsulUpstream { + if u == nil { + return nil + } + + return &ConsulUpstream{ + DestinationName: u.DestinationName, + LocalBindPort: u.LocalBindPort, + } +} + +// Equals returns true if the structs are recursively equal. +func (u *ConsulUpstream) Equals(o *ConsulUpstream) bool { + if u == nil || o == nil { + return u == o + } + + return (*u) == (*o) +} diff --git a/nomad/structs/services_test.go b/nomad/structs/services_test.go new file mode 100644 index 000000000..b1f265999 --- /dev/null +++ b/nomad/structs/services_test.go @@ -0,0 +1,62 @@ +package structs + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestConsulConnect_Validate(t *testing.T) { + t.Parallel() + + c := &ConsulConnect{} + + // An empty Connect stanza is invalid + require.Error(t, c.Validate()) + + // Native=true is valid + c.Native = true + require.NoError(t, c.Validate()) + + // Native=true + Sidecar!=nil is invalid + c.SidecarService = &ConsulSidecarService{} + require.Error(t, c.Validate()) + + // Native=false + Sidecar!=nil is valid + c.Native = false + require.NoError(t, c.Validate()) +} + +func TestConsulConnect_CopyEquals(t *testing.T) { + t.Parallel() + + c := &ConsulConnect{ + SidecarService: &ConsulSidecarService{ + Port: "9001", + Proxy: &ConsulProxy{ + Upstreams: []*ConsulUpstream{ + { + DestinationName: "up1", + LocalBindPort: 9002, + }, + { + DestinationName: "up2", + LocalBindPort: 9003, + }, + }, + Config: map[string]interface{}{ + "foo": 1, + }, + }, + }, + } + + require.NoError(t, c.Validate()) + + // Copies should be equivalent + o := c.Copy() + require.True(t, c.Equals(o)) + + o.SidecarService.Proxy.Upstreams = nil + require.False(t, c.Equals(o)) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 59851fcf9..a160a6f2c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -12,10 +12,8 @@ import ( "encoding/hex" "errors" "fmt" - "io" "math" "net" - "net/url" "os" "path/filepath" "reflect" @@ -26,7 +24,6 @@ import ( "time" "github.com/gorhill/cronexpr" - "github.com/hashicorp/consul/api" hcodec "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-version" @@ -5039,485 +5036,6 @@ func (c *CheckRestart) Validate() error { return mErr.ErrorOrNil() } -const ( - ServiceCheckHTTP = "http" - ServiceCheckTCP = "tcp" - ServiceCheckScript = "script" - ServiceCheckGRPC = "grpc" - - // minCheckInterval is the minimum check interval permitted. Consul - // currently has its MinInterval set to 1s. Mirror that here for - // consistency. - minCheckInterval = 1 * time.Second - - // minCheckTimeout is the minimum check timeout permitted for Consul - // script TTL checks. - minCheckTimeout = 1 * time.Second -) - -// The ServiceCheck data model represents the consul health check that -// Nomad registers for a Task -type ServiceCheck struct { - Name string // Name of the check, defaults to id - Type string // Type of the check - tcp, http, docker and script - Command string // Command is the command to run for script checks - Args []string // Args is a list of arguments for script checks - Path string // path of the health check url for http type check - Protocol string // Protocol to use if check is http, defaults to http - PortLabel string // The port to use for tcp/http checks - AddressMode string // 'host' to use host ip:port or 'driver' to use driver's - Interval time.Duration // Interval of the check - Timeout time.Duration // Timeout of the response from the check before consul fails the check - InitialStatus string // Initial status of the check - TLSSkipVerify bool // Skip TLS verification when Protocol=https - Method string // HTTP Method to use (GET by default) - Header map[string][]string // HTTP Headers for Consul to set when making HTTP checks - CheckRestart *CheckRestart // If and when a task should be restarted based on checks - GRPCService string // Service for GRPC checks - GRPCUseTLS bool // Whether or not to use TLS for GRPC checks -} - -func (sc *ServiceCheck) Copy() *ServiceCheck { - if sc == nil { - return nil - } - nsc := new(ServiceCheck) - *nsc = *sc - nsc.Args = helper.CopySliceString(sc.Args) - nsc.Header = helper.CopyMapStringSliceString(sc.Header) - nsc.CheckRestart = sc.CheckRestart.Copy() - return nsc -} - -func (sc *ServiceCheck) Equals(o *ServiceCheck) bool { - if sc == nil || o == nil { - return sc == o - } - - if sc.Name != o.Name { - return false - } - - if sc.AddressMode != o.AddressMode { - return false - } - - if !helper.CompareSliceSetString(sc.Args, o.Args) { - return false - } - - if !sc.CheckRestart.Equals(o.CheckRestart) { - return false - } - - if sc.Command != o.Command { - return false - } - - if sc.GRPCService != o.GRPCService { - return false - } - - if sc.GRPCUseTLS != o.GRPCUseTLS { - return false - } - - // Use DeepEqual here as order of slice values could matter - if !reflect.DeepEqual(sc.Header, o.Header) { - return false - } - - if sc.InitialStatus != o.InitialStatus { - return false - } - - if sc.Interval != o.Interval { - return false - } - - if sc.Method != o.Method { - return false - } - - if sc.Path != o.Path { - return false - } - - if sc.PortLabel != o.Path { - return false - } - - if sc.Protocol != o.Protocol { - return false - } - - if sc.TLSSkipVerify != o.TLSSkipVerify { - return false - } - - if sc.Timeout != o.Timeout { - return false - } - - if sc.Type != o.Type { - return false - } - - return true -} - -func (sc *ServiceCheck) Canonicalize(serviceName string) { - // Ensure empty maps/slices are treated as null to avoid scheduling - // issues when using DeepEquals. - if len(sc.Args) == 0 { - sc.Args = nil - } - - if len(sc.Header) == 0 { - sc.Header = nil - } else { - for k, v := range sc.Header { - if len(v) == 0 { - sc.Header[k] = nil - } - } - } - - if sc.Name == "" { - sc.Name = fmt.Sprintf("service: %q check", serviceName) - } -} - -// validate a Service's ServiceCheck -func (sc *ServiceCheck) validate() error { - // Validate Type - switch strings.ToLower(sc.Type) { - case ServiceCheckGRPC: - case ServiceCheckTCP: - case ServiceCheckHTTP: - if sc.Path == "" { - return fmt.Errorf("http type must have a valid http path") - } - url, err := url.Parse(sc.Path) - if err != nil { - return fmt.Errorf("http type must have a valid http path") - } - if url.IsAbs() { - return fmt.Errorf("http type must have a relative http path") - } - - case ServiceCheckScript: - if sc.Command == "" { - return fmt.Errorf("script type must have a valid script path") - } - - default: - return fmt.Errorf(`invalid type (%+q), must be one of "http", "tcp", or "script" type`, sc.Type) - } - - // Validate interval and timeout - if sc.Interval == 0 { - return fmt.Errorf("missing required value interval. Interval cannot be less than %v", minCheckInterval) - } else if sc.Interval < minCheckInterval { - return fmt.Errorf("interval (%v) cannot be lower than %v", sc.Interval, minCheckInterval) - } - - if sc.Timeout == 0 { - return fmt.Errorf("missing required value timeout. Timeout cannot be less than %v", minCheckInterval) - } else if sc.Timeout < minCheckTimeout { - return fmt.Errorf("timeout (%v) is lower than required minimum timeout %v", sc.Timeout, minCheckInterval) - } - - // Validate InitialStatus - switch sc.InitialStatus { - case "": - case api.HealthPassing: - case api.HealthWarning: - case api.HealthCritical: - default: - return fmt.Errorf(`invalid initial check state (%s), must be one of %q, %q, %q or empty`, sc.InitialStatus, api.HealthPassing, api.HealthWarning, api.HealthCritical) - - } - - // Validate AddressMode - switch sc.AddressMode { - case "", AddressModeHost, AddressModeDriver: - // Ok - case AddressModeAuto: - return fmt.Errorf("invalid address_mode %q - %s only valid for services", sc.AddressMode, AddressModeAuto) - default: - return fmt.Errorf("invalid address_mode %q", sc.AddressMode) - } - - return sc.CheckRestart.Validate() -} - -// RequiresPort returns whether the service check requires the task has a port. -func (sc *ServiceCheck) RequiresPort() bool { - switch sc.Type { - case ServiceCheckGRPC, ServiceCheckHTTP, ServiceCheckTCP: - return true - default: - return false - } -} - -// TriggersRestarts returns true if this check should be watched and trigger a restart -// on failure. -func (sc *ServiceCheck) TriggersRestarts() bool { - return sc.CheckRestart != nil && sc.CheckRestart.Limit > 0 -} - -// Hash all ServiceCheck fields and the check's corresponding service ID to -// create an identifier. The identifier is not guaranteed to be unique as if -// the PortLabel is blank, the Service's PortLabel will be used after Hash is -// called. -func (sc *ServiceCheck) Hash(serviceID string) string { - h := sha1.New() - io.WriteString(h, serviceID) - io.WriteString(h, sc.Name) - io.WriteString(h, sc.Type) - io.WriteString(h, sc.Command) - io.WriteString(h, strings.Join(sc.Args, "")) - io.WriteString(h, sc.Path) - io.WriteString(h, sc.Protocol) - io.WriteString(h, sc.PortLabel) - io.WriteString(h, sc.Interval.String()) - io.WriteString(h, sc.Timeout.String()) - io.WriteString(h, sc.Method) - // Only include TLSSkipVerify if set to maintain ID stability with Nomad <0.6 - if sc.TLSSkipVerify { - io.WriteString(h, "true") - } - - // Since map iteration order isn't stable we need to write k/v pairs to - // a slice and sort it before hashing. - if len(sc.Header) > 0 { - headers := make([]string, 0, len(sc.Header)) - for k, v := range sc.Header { - headers = append(headers, k+strings.Join(v, "")) - } - sort.Strings(headers) - io.WriteString(h, strings.Join(headers, "")) - } - - // Only include AddressMode if set to maintain ID stability with Nomad <0.7.1 - if len(sc.AddressMode) > 0 { - io.WriteString(h, sc.AddressMode) - } - - // Only include GRPC if set to maintain ID stability with Nomad <0.8.4 - if sc.GRPCService != "" { - io.WriteString(h, sc.GRPCService) - } - if sc.GRPCUseTLS { - io.WriteString(h, "true") - } - - return fmt.Sprintf("%x", h.Sum(nil)) -} - -const ( - AddressModeAuto = "auto" - AddressModeHost = "host" - AddressModeDriver = "driver" -) - -// Service represents a Consul service definition in Nomad -type Service struct { - // Name of the service registered with Consul. Consul defaults the - // Name to ServiceID if not specified. The Name if specified is used - // as one of the seed values when generating a Consul ServiceID. - Name string - - // PortLabel is either the numeric port number or the `host:port`. - // To specify the port number using the host's Consul Advertise - // address, specify an empty host in the PortLabel (e.g. `:port`). - PortLabel string - - // AddressMode specifies whether or not to use the host ip:port for - // this service. - AddressMode string - - Tags []string // List of tags for the service - CanaryTags []string // List of tags for the service when it is a canary - Checks []*ServiceCheck // List of checks associated with the service - Connect *ConsulConnect // Consul Connect configuration -} - -func (s *Service) Copy() *Service { - if s == nil { - return nil - } - ns := new(Service) - *ns = *s - ns.Tags = helper.CopySliceString(ns.Tags) - ns.CanaryTags = helper.CopySliceString(ns.CanaryTags) - - if s.Checks != nil { - checks := make([]*ServiceCheck, len(ns.Checks)) - for i, c := range ns.Checks { - checks[i] = c.Copy() - } - ns.Checks = checks - } - - return ns -} - -// Canonicalize interpolates values of Job, Task Group and Task in the Service -// Name. This also generates check names, service id and check ids. -func (s *Service) Canonicalize(job string, taskGroup string, task string) { - // Ensure empty lists are treated as null to avoid scheduler issues when - // using DeepEquals - if len(s.Tags) == 0 { - s.Tags = nil - } - if len(s.CanaryTags) == 0 { - s.CanaryTags = nil - } - if len(s.Checks) == 0 { - s.Checks = nil - } - - s.Name = args.ReplaceEnv(s.Name, map[string]string{ - "JOB": job, - "TASKGROUP": taskGroup, - "TASK": task, - "BASE": fmt.Sprintf("%s-%s-%s", job, taskGroup, task), - }, - ) - - for _, check := range s.Checks { - check.Canonicalize(s.Name) - } -} - -// Validate checks if the Check definition is valid -func (s *Service) Validate() error { - var mErr multierror.Error - - // Ensure the service name is valid per the below RFCs but make an exception - // for our interpolation syntax by first stripping any environment variables from the name - - serviceNameStripped := args.ReplaceEnvWithPlaceHolder(s.Name, "ENV-VAR") - - if err := s.ValidateName(serviceNameStripped); err != nil { - mErr.Errors = append(mErr.Errors, fmt.Errorf("service name must be valid per RFC 1123 and can contain only alphanumeric characters or dashes: %q", s.Name)) - } - - switch s.AddressMode { - case "", AddressModeAuto, AddressModeHost, AddressModeDriver: - // OK - default: - mErr.Errors = append(mErr.Errors, fmt.Errorf("service address_mode must be %q, %q, or %q; not %q", AddressModeAuto, AddressModeHost, AddressModeDriver, s.AddressMode)) - } - - for _, c := range s.Checks { - if s.PortLabel == "" && c.PortLabel == "" && c.RequiresPort() { - mErr.Errors = append(mErr.Errors, fmt.Errorf("check %s invalid: check requires a port but neither check nor service %+q have a port", c.Name, s.Name)) - continue - } - - if err := c.validate(); err != nil { - mErr.Errors = append(mErr.Errors, fmt.Errorf("check %s invalid: %v", c.Name, err)) - } - } - - return mErr.ErrorOrNil() -} - -// ValidateName checks if the services Name is valid and should be called after -// the name has been interpolated -func (s *Service) ValidateName(name string) error { - // Ensure the service name is valid per RFC-952 §1 - // (https://tools.ietf.org/html/rfc952), RFC-1123 §2.1 - // (https://tools.ietf.org/html/rfc1123), and RFC-2782 - // (https://tools.ietf.org/html/rfc2782). - re := regexp.MustCompile(`^(?i:[a-z0-9]|[a-z0-9][a-z0-9\-]{0,61}[a-z0-9])$`) - if !re.MatchString(name) { - return fmt.Errorf("service name must be valid per RFC 1123 and can contain only alphanumeric characters or dashes and must be no longer than 63 characters: %q", name) - } - return nil -} - -// Hash returns a base32 encoded hash of a Service's contents excluding checks -// as they're hashed independently. -func (s *Service) Hash(allocID, taskName string, canary bool) string { - h := sha1.New() - io.WriteString(h, allocID) - io.WriteString(h, taskName) - io.WriteString(h, s.Name) - io.WriteString(h, s.PortLabel) - io.WriteString(h, s.AddressMode) - for _, tag := range s.Tags { - io.WriteString(h, tag) - } - for _, tag := range s.CanaryTags { - io.WriteString(h, tag) - } - - // Vary ID on whether or not CanaryTags will be used - if canary { - h.Write([]byte("Canary")) - } - - // Base32 is used for encoding the hash as sha1 hashes can always be - // encoded without padding, only 4 bytes larger than base64, and saves - // 8 bytes vs hex. Since these hashes are used in Consul URLs it's nice - // to have a reasonably compact URL-safe representation. - return b32.EncodeToString(h.Sum(nil)) -} - -func (s *Service) Equals(o *Service) bool { - if s == nil || o == nil { - return s == o - } - - if s.AddressMode != o.AddressMode { - return false - } - - if !helper.CompareSliceSetString(s.CanaryTags, o.CanaryTags) { - return false - } - - if len(s.Checks) != len(o.Checks) { - return false - } - -OUTER: - for i := range s.Checks { - for ii := range o.Checks { - if s.Checks[i].Equals(o.Checks[ii]) { - // Found match; continue with next check - continue OUTER - } - } - - // No match - return false - } - - if !s.Connect.Equals(o.Connect) { - return false - } - - if s.Name != o.Name { - return false - } - - if s.PortLabel != o.PortLabel { - return false - } - - if !helper.CompareSliceSetString(s.Tags, o.Tags) { - return false - } - - return true -} - const ( // DefaultKillTimeout is the default timeout between signaling a task it // will be killed and killing it. diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 6f1fc03f6..5456d9db5 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -2235,6 +2235,69 @@ func TestService_Canonicalize(t *testing.T) { } +func TestService_Validate(t *testing.T) { + s := Service{ + Name: "testservice", + } + + s.Canonicalize("testjob", "testgroup", "testtask") + + // Base service should be valid + require.NoError(t, s.Validate()) + + // Native Connect should be valid + s.Connect = &ConsulConnect{ + Native: true, + } + require.NoError(t, s.Validate()) + + // Native Connect + Sidecar should be invalid + s.Connect.SidecarService = &ConsulSidecarService{} + require.Error(t, s.Validate()) +} + +func TestService_Equals(t *testing.T) { + s := Service{ + Name: "testservice", + } + + s.Canonicalize("testjob", "testgroup", "testtask") + + o := s.Copy() + + // Base service should be equal to copy of itself + require.True(t, s.Equals(o)) + + // create a helper to assert a diff and reset the struct + assertDiff := func() { + require.False(t, s.Equals(o)) + o = s.Copy() + require.True(t, s.Equals(o), "bug in copy") + } + + // Changing any field should cause inequality + o.Name = "diff" + assertDiff() + + o.PortLabel = "diff" + assertDiff() + + o.AddressMode = AddressModeDriver + assertDiff() + + o.Tags = []string{"diff"} + assertDiff() + + o.CanaryTags = []string{"diff"} + assertDiff() + + o.Checks = []*ServiceCheck{{Name: "diff"}} + assertDiff() + + o.Connect = &ConsulConnect{Native: true} + assertDiff() +} + func TestJob_ExpandServiceNames(t *testing.T) { j := &Job{ Name: "my-job",