consul: add Connect structs

Refactor all Consul structs into {api,structs}/services.go because
api/tasks.go didn't make sense anymore and structs/structs.go is
gigantic.
This commit is contained in:
Michael Schurter
2019-07-30 15:40:45 -07:00
parent 2e0d67cbbb
commit 75e5e033fd
14 changed files with 1385 additions and 900 deletions

157
api/services.go Normal file
View File

@@ -0,0 +1,157 @@
package api
import (
"fmt"
"time"
)
// CheckRestart describes if and when a task should be restarted based on
// failing health checks.
type CheckRestart struct {
Limit int `mapstructure:"limit"`
Grace *time.Duration `mapstructure:"grace"`
IgnoreWarnings bool `mapstructure:"ignore_warnings"`
}
// Canonicalize CheckRestart fields if not nil.
func (c *CheckRestart) Canonicalize() {
if c == nil {
return
}
if c.Grace == nil {
c.Grace = timeToPtr(1 * time.Second)
}
}
// Copy returns a copy of CheckRestart or nil if unset.
func (c *CheckRestart) Copy() *CheckRestart {
if c == nil {
return nil
}
nc := new(CheckRestart)
nc.Limit = c.Limit
if c.Grace != nil {
g := *c.Grace
nc.Grace = &g
}
nc.IgnoreWarnings = c.IgnoreWarnings
return nc
}
// Merge values from other CheckRestart over default values on this
// CheckRestart and return merged copy.
func (c *CheckRestart) Merge(o *CheckRestart) *CheckRestart {
if c == nil {
// Just return other
return o
}
nc := c.Copy()
if o == nil {
// Nothing to merge
return nc
}
if o.Limit > 0 {
nc.Limit = o.Limit
}
if o.Grace != nil {
nc.Grace = o.Grace
}
if o.IgnoreWarnings {
nc.IgnoreWarnings = o.IgnoreWarnings
}
return nc
}
// ServiceCheck represents the consul health check that Nomad registers.
type ServiceCheck struct {
//FIXME Id is unused. Remove?
Id string
Name string
Type string
Command string
Args []string
Path string
Protocol string
PortLabel string `mapstructure:"port"`
AddressMode string `mapstructure:"address_mode"`
Interval time.Duration
Timeout time.Duration
InitialStatus string `mapstructure:"initial_status"`
TLSSkipVerify bool `mapstructure:"tls_skip_verify"`
Header map[string][]string
Method string
CheckRestart *CheckRestart `mapstructure:"check_restart"`
GRPCService string `mapstructure:"grpc_service"`
GRPCUseTLS bool `mapstructure:"grpc_use_tls"`
}
// Service represents a Consul service definition.
type Service struct {
//FIXME Id is unused. Remove?
Id string
Name string
Tags []string
CanaryTags []string `mapstructure:"canary_tags"`
PortLabel string `mapstructure:"port"`
AddressMode string `mapstructure:"address_mode"`
Checks []ServiceCheck
CheckRestart *CheckRestart `mapstructure:"check_restart"`
Connect *ConsulConnect
}
// Canonicalize the Service by ensuring its name and address mode are set. Task
// will be nil for group services.
func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) {
if s.Name == "" {
if t != nil {
s.Name = fmt.Sprintf("%s-%s-%s", *job.Name, *tg.Name, t.Name)
} else {
s.Name = fmt.Sprintf("%s-%s", *job.Name, *tg.Name)
}
}
// Default to AddressModeAuto
if s.AddressMode == "" {
s.AddressMode = "auto"
}
// Canonicalize CheckRestart on Checks and merge Service.CheckRestart
// into each check.
for i, check := range s.Checks {
s.Checks[i].CheckRestart = s.CheckRestart.Merge(check.CheckRestart)
s.Checks[i].CheckRestart.Canonicalize()
}
}
// ConsulConnect represents a Consul Connect jobspec stanza.
type ConsulConnect struct {
Native bool
SidecarService *ConsulSidecarService `mapstructure:"sidecar_service"`
}
// ConsulSidecarService represents a Consul Connect SidecarService jobspec
// stanza.
type ConsulSidecarService struct {
Port string
Proxy *ConsulProxy
}
// ConsulProxy represents a Consul Connect sidecar proxy jobspec stanza.
type ConsulProxy struct {
Upstreams []*ConsulUpstream
Config map[string]interface{}
}
// ConsulUpstream represents a Consul Connect upstream jobspec stanza.
type ConsulUpstream struct {
DestinationName string `mapstructure:"destination_name"`
LocalBindPort int `mapstructure:"local_bind_port"`
}

View File

@@ -274,144 +274,6 @@ func (s *Spread) Canonicalize() {
}
}
// CheckRestart describes if and when a task should be restarted based on
// failing health checks.
type CheckRestart struct {
Limit int `mapstructure:"limit"`
Grace *time.Duration `mapstructure:"grace"`
IgnoreWarnings bool `mapstructure:"ignore_warnings"`
}
// Canonicalize CheckRestart fields if not nil.
func (c *CheckRestart) Canonicalize() {
if c == nil {
return
}
if c.Grace == nil {
c.Grace = timeToPtr(1 * time.Second)
}
}
// Copy returns a copy of CheckRestart or nil if unset.
func (c *CheckRestart) Copy() *CheckRestart {
if c == nil {
return nil
}
nc := new(CheckRestart)
nc.Limit = c.Limit
if c.Grace != nil {
g := *c.Grace
nc.Grace = &g
}
nc.IgnoreWarnings = c.IgnoreWarnings
return nc
}
// Merge values from other CheckRestart over default values on this
// CheckRestart and return merged copy.
func (c *CheckRestart) Merge(o *CheckRestart) *CheckRestart {
if c == nil {
// Just return other
return o
}
nc := c.Copy()
if o == nil {
// Nothing to merge
return nc
}
if o.Limit > 0 {
nc.Limit = o.Limit
}
if o.Grace != nil {
nc.Grace = o.Grace
}
if o.IgnoreWarnings {
nc.IgnoreWarnings = o.IgnoreWarnings
}
return nc
}
// The ServiceCheck data model represents the consul health check that
// Nomad registers for a Task
type ServiceCheck struct {
Id string
Name string
Type string
Command string
Args []string
Path string
Protocol string
PortLabel string `mapstructure:"port"`
AddressMode string `mapstructure:"address_mode"`
Interval time.Duration
Timeout time.Duration
InitialStatus string `mapstructure:"initial_status"`
TLSSkipVerify bool `mapstructure:"tls_skip_verify"`
Header map[string][]string
Method string
CheckRestart *CheckRestart `mapstructure:"check_restart"`
GRPCService string `mapstructure:"grpc_service"`
GRPCUseTLS bool `mapstructure:"grpc_use_tls"`
}
// The Service model represents a Consul service definition
type Service struct {
Id string
Name string
Tags []string
CanaryTags []string `mapstructure:"canary_tags"`
PortLabel string `mapstructure:"port"`
AddressMode string `mapstructure:"address_mode"`
Checks []ServiceCheck
CheckRestart *CheckRestart `mapstructure:"check_restart"`
Connect *ConsulConnect
}
func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) {
if s.Name == "" {
s.Name = fmt.Sprintf("%s-%s-%s", *job.Name, *tg.Name, t.Name)
}
// Default to AddressModeAuto
if s.AddressMode == "" {
s.AddressMode = "auto"
}
// Canonicalize CheckRestart on Checks and merge Service.CheckRestart
// into each check.
for i, check := range s.Checks {
s.Checks[i].CheckRestart = s.CheckRestart.Merge(check.CheckRestart)
s.Checks[i].CheckRestart.Canonicalize()
}
}
type ConsulConnect struct {
SidecarService *ConsulSidecarService `mapstructure:"sidecar_service"`
}
type ConsulSidecarService struct {
Port string
Proxy *ConsulProxy
}
type ConsulProxy struct {
Upstreams []*ConsulUpstream
}
type ConsulUpstream struct {
//FIXME Pointers?
DestinationName string `mapstructure:"destination_name"`
LocalBindPort int `mapstructure:"local_bind_port"`
}
// EphemeralDisk is an ephemeral disk object
type EphemeralDisk struct {
Sticky *bool
@@ -629,6 +491,9 @@ func (g *TaskGroup) Canonicalize(job *Job) {
for _, n := range g.Networks {
n.Canonicalize()
}
for _, s := range g.Services {
s.Canonicalize(nil, g, job)
}
}
// Constrain is used to add a constraint to a task group.

View File

@@ -998,7 +998,9 @@ func ApiServicesToStructs(in []*api.Service) []*structs.Service {
continue
}
out[i].Connect = &structs.ConsulConnect{}
out[i].Connect = &structs.ConsulConnect{
Native: s.Connect.Native,
}
if s.Connect.SidecarService == nil {
continue
@@ -1012,7 +1014,9 @@ func ApiServicesToStructs(in []*api.Service) []*structs.Service {
continue
}
out[i].Connect.SidecarService.Proxy = &structs.ConsulProxy{}
out[i].Connect.SidecarService.Proxy = &structs.ConsulProxy{
Config: s.Connect.SidecarService.Proxy.Config,
}
upstreams := make([]*structs.ConsulUpstream, len(s.Connect.SidecarService.Proxy.Upstreams))
for i, p := range s.Connect.SidecarService.Proxy.Upstreams {

View File

@@ -1492,10 +1492,43 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
ProgressDeadline: helper.TimeToPtr(5 * time.Minute),
AutoRevert: helper.BoolToPtr(true),
},
Meta: map[string]string{
"key": "value",
},
Services: []*api.Service{
{
Name: "groupserviceA",
Tags: []string{"a", "b"},
CanaryTags: []string{"d", "e"},
PortLabel: "1234",
CheckRestart: &api.CheckRestart{
Limit: 4,
Grace: helper.TimeToPtr(11 * time.Second),
},
Checks: []api.ServiceCheck{
{
Id: "hello",
Name: "bar",
Type: "http",
Command: "foo",
Args: []string{"a", "b"},
Path: "/check",
Protocol: "http",
PortLabel: "foo",
AddressMode: "driver",
GRPCService: "foo.Bar",
GRPCUseTLS: true,
Interval: 4 * time.Second,
Timeout: 2 * time.Second,
InitialStatus: "ok",
CheckRestart: &api.CheckRestart{
Limit: 3,
IgnoreWarnings: true,
},
},
},
},
},
Tasks: []*api.Task{
{
Name: "task1",
@@ -1798,6 +1831,37 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Meta: map[string]string{
"key": "value",
},
Services: []*structs.Service{
{
Name: "groupserviceA",
Tags: []string{"a", "b"},
CanaryTags: []string{"d", "e"},
PortLabel: "1234",
AddressMode: "auto",
Checks: []*structs.ServiceCheck{
{
Name: "bar",
Type: "http",
Command: "foo",
Args: []string{"a", "b"},
Path: "/check",
Protocol: "http",
PortLabel: "foo",
AddressMode: "driver",
GRPCService: "foo.Bar",
GRPCUseTLS: true,
Interval: 4 * time.Second,
Timeout: 2 * time.Second,
InitialStatus: "ok",
CheckRestart: &structs.CheckRestart{
Grace: 11 * time.Second,
Limit: 3,
IgnoreWarnings: true,
},
},
},
},
},
Tasks: []*structs.Task{
{
Name: "task1",

View File

@@ -947,9 +947,12 @@ func parseTasks(jobName string, taskGroupName string, result *[]*api.Task, list
}
if o := listVal.Filter("service"); len(o.Items) > 0 {
if err := parseServices(jobName, taskGroupName, &t, o); err != nil {
services, err := parseServices(jobName, taskGroupName, o)
if err != nil {
return multierror.Prefix(err, fmt.Sprintf("'%s',", n))
}
t.Services = services
}
// If we have config, then parse that
@@ -1287,8 +1290,8 @@ func parseGroupServices(jobName string, taskGroupName string, g *api.TaskGroup,
return nil
}
func parseServices(jobName string, taskGroupName string, task *api.Task, serviceObjs *ast.ObjectList) error {
task.Services = make([]*api.Service, len(serviceObjs.Items))
func parseServices(jobName string, taskGroupName string, serviceObjs *ast.ObjectList) ([]*api.Service, error) {
services := make([]*api.Service, len(serviceObjs.Items))
for idx, o := range serviceObjs.Items {
// Check for invalid keys
valid := []string{
@@ -1299,22 +1302,24 @@ func parseServices(jobName string, taskGroupName string, task *api.Task, service
"check",
"address_mode",
"check_restart",
"connect",
}
if err := helper.CheckHCLKeys(o.Val, valid); err != nil {
return multierror.Prefix(err, fmt.Sprintf("service (%d) ->", idx))
return nil, multierror.Prefix(err, fmt.Sprintf("service (%d) ->", idx))
}
var service api.Service
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Val); err != nil {
return err
return nil, err
}
delete(m, "check")
delete(m, "check_restart")
delete(m, "connect")
if err := mapstructure.WeakDecode(m, &service); err != nil {
return err
return nil, err
}
// Filter checks
@@ -1322,33 +1327,236 @@ func parseServices(jobName string, taskGroupName string, task *api.Task, service
if ot, ok := o.Val.(*ast.ObjectType); ok {
checkList = ot.List
} else {
return fmt.Errorf("service '%s': should be an object", service.Name)
return nil, fmt.Errorf("service '%s': should be an object", service.Name)
}
if co := checkList.Filter("check"); len(co.Items) > 0 {
if err := parseChecks(&service, co); err != nil {
return multierror.Prefix(err, fmt.Sprintf("service: '%s',", service.Name))
return nil, multierror.Prefix(err, fmt.Sprintf("service: '%s',", service.Name))
}
}
// Filter check_restart
if cro := checkList.Filter("check_restart"); len(cro.Items) > 0 {
if len(cro.Items) > 1 {
return fmt.Errorf("check_restart '%s': cannot have more than 1 check_restart", service.Name)
return nil, fmt.Errorf("check_restart '%s': cannot have more than 1 check_restart", service.Name)
}
if cr, err := parseCheckRestart(cro.Items[0]); err != nil {
return multierror.Prefix(err, fmt.Sprintf("service: '%s',", service.Name))
return nil, multierror.Prefix(err, fmt.Sprintf("service: '%s',", service.Name))
} else {
service.CheckRestart = cr
}
}
task.Services[idx] = &service
// Filter connect
if co := checkList.Filter("connect"); len(co.Items) > 0 {
if len(co.Items) > 1 {
return nil, fmt.Errorf("connect '%s': cannot have more than 1 connect stanza", service.Name)
}
c, err := parseConnect(co.Items[0])
if err != nil {
return nil, multierror.Prefix(err, fmt.Sprintf("service: '%s',", service.Name))
}
service.Connect = c
}
services[idx] = &service
}
return nil
return services, nil
}
func parseConnect(co *ast.ObjectItem) (*api.ConsulConnect, error) {
valid := []string{
"native",
"sidecar_service",
}
if err := helper.CheckHCLKeys(co.Val, valid); err != nil {
return nil, multierror.Prefix(err, "connect ->")
}
var connect api.ConsulConnect
var m map[string]interface{}
if err := hcl.DecodeObject(&m, co.Val); err != nil {
return nil, err
}
delete(m, "sidecar_service")
if err := mapstructure.WeakDecode(m, &connect); err != nil {
return nil, err
}
var connectList *ast.ObjectList
if ot, ok := co.Val.(*ast.ObjectType); ok {
connectList = ot.List
} else {
return nil, fmt.Errorf("connect should be an object")
}
// Parse the sidecar_service
o := connectList.Filter("sidecar_service")
if len(o.Items) == 0 {
return &connect, nil
}
if len(o.Items) > 1 {
return nil, fmt.Errorf("only one 'sidecar_service' block allowed per task")
}
r, err := parseSidecarService(o.Items[0])
if err != nil {
return nil, fmt.Errorf("sidecar_service, %v", err)
}
connect.SidecarService = r
return &connect, nil
}
func parseSidecarService(o *ast.ObjectItem) (*api.ConsulSidecarService, error) {
valid := []string{
"port",
"proxy",
}
if err := helper.CheckHCLKeys(o.Val, valid); err != nil {
return nil, multierror.Prefix(err, "sidecar_service ->")
}
var sidecar api.ConsulSidecarService
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Val); err != nil {
return nil, err
}
delete(m, "proxy")
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: &sidecar,
})
if err != nil {
return nil, err
}
if err := dec.Decode(m); err != nil {
return nil, fmt.Errorf("foo: %v", err)
}
var proxyList *ast.ObjectList
if ot, ok := o.Val.(*ast.ObjectType); ok {
proxyList = ot.List
} else {
return nil, fmt.Errorf("sidecar_service: should be an object")
}
// Parse the proxy
po := proxyList.Filter("proxy")
if len(po.Items) == 0 {
return &sidecar, nil
}
if len(po.Items) > 1 {
return nil, fmt.Errorf("only one 'proxy' block allowed per task")
}
r, err := parseProxy(po.Items[0])
if err != nil {
return nil, fmt.Errorf("proxy, %v", err)
}
sidecar.Proxy = r
return &sidecar, nil
}
func parseProxy(o *ast.ObjectItem) (*api.ConsulProxy, error) {
valid := []string{
"upstreams",
"config",
}
if err := helper.CheckHCLKeys(o.Val, valid); err != nil {
return nil, multierror.Prefix(err, "proxy ->")
}
var proxy api.ConsulProxy
var listVal *ast.ObjectList
if ot, ok := o.Val.(*ast.ObjectType); ok {
listVal = ot.List
} else {
return nil, fmt.Errorf("proxy: should be an object")
}
// Parse the proxy
uo := listVal.Filter("upstreams")
proxy.Upstreams = make([]*api.ConsulUpstream, len(uo.Items))
for i := range uo.Items {
u, err := parseUpstream(uo.Items[i])
if err != nil {
return nil, err
}
proxy.Upstreams[i] = u
}
// If we have config, then parse that
if o := listVal.Filter("config"); len(o.Items) > 1 {
return nil, fmt.Errorf("only 1 meta object supported")
} else if len(o.Items) == 1 {
var mSlice []map[string]interface{}
if err := hcl.DecodeObject(&mSlice, o.Items[0].Val); err != nil {
return nil, err
}
if len(mSlice) > 1 {
return nil, fmt.Errorf("only 1 meta object supported")
}
m := mSlice[0]
if err := mapstructure.WeakDecode(m, &proxy.Config); err != nil {
return nil, err
}
proxy.Config = flattenMapSlice(proxy.Config)
}
return &proxy, nil
}
func parseUpstream(uo *ast.ObjectItem) (*api.ConsulUpstream, error) {
valid := []string{
"destination_name",
"local_bind_port",
}
if err := helper.CheckHCLKeys(uo.Val, valid); err != nil {
return nil, multierror.Prefix(err, "upstream ->")
}
var upstream api.ConsulUpstream
var m map[string]interface{}
if err := hcl.DecodeObject(&m, uo.Val); err != nil {
return nil, err
}
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: &upstream,
})
if err != nil {
return nil, err
}
if err := dec.Decode(m); err != nil {
return nil, err
}
return &upstream, nil
}
func parseChecks(service *api.Service, checkObjs *ast.ObjectList) error {
service.Checks = make([]api.ServiceCheck, len(checkObjs.Items))
for idx, co := range checkObjs.Items {
@@ -1483,162 +1691,6 @@ func parseCheckRestart(cro *ast.ObjectItem) (*api.CheckRestart, error) {
return &checkRestart, nil
}
func parseConnect(co *ast.ObjectItem) (*api.ConsulConnect, error) {
valid := []string{
"sidecar_service",
}
if err := helper.CheckHCLKeys(co.Val, valid); err != nil {
return nil, multierror.Prefix(err, "connect ->")
}
var connect api.ConsulConnect
var connectList *ast.ObjectList
if ot, ok := co.Val.(*ast.ObjectType); ok {
connectList = ot.List
} else {
return nil, fmt.Errorf("connect should be an object")
}
// Parse the sidecar_service
o := connectList.Filter("sidecar_service")
if len(o.Items) == 0 {
return nil, nil
}
if len(o.Items) > 1 {
return nil, fmt.Errorf("only one 'sidecar_service' block allowed per task")
}
r, err := parseSidecarService(o.Items[0])
if err != nil {
return nil, fmt.Errorf("sidecar_service, %v", err)
}
connect.SidecarService = r
return &connect, nil
}
func parseSidecarService(o *ast.ObjectItem) (*api.ConsulSidecarService, error) {
valid := []string{
"port",
"proxy",
}
if err := helper.CheckHCLKeys(o.Val, valid); err != nil {
return nil, multierror.Prefix(err, "sidecar_service ->")
}
var sidecar api.ConsulSidecarService
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Val); err != nil {
return nil, err
}
delete(m, "proxy")
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: &sidecar,
})
if err != nil {
return nil, err
}
if err := dec.Decode(m); err != nil {
return nil, fmt.Errorf("foo: %v", err)
}
var proxyList *ast.ObjectList
if ot, ok := o.Val.(*ast.ObjectType); ok {
proxyList = ot.List
} else {
return nil, fmt.Errorf("sidecar_service: should be an object")
}
// Parse the proxy
po := proxyList.Filter("proxy")
if len(po.Items) == 0 {
return &sidecar, nil
}
if len(po.Items) > 1 {
return nil, fmt.Errorf("only one 'proxy' block allowed per task")
}
r, err := parseProxy(po.Items[0])
if err != nil {
return nil, fmt.Errorf("proxy, %v", err)
}
sidecar.Proxy = r
return &sidecar, nil
}
func parseProxy(o *ast.ObjectItem) (*api.ConsulProxy, error) {
valid := []string{
"upstreams",
}
if err := helper.CheckHCLKeys(o.Val, valid); err != nil {
return nil, multierror.Prefix(err, "proxy ->")
}
var proxy api.ConsulProxy
var listVal *ast.ObjectList
if ot, ok := o.Val.(*ast.ObjectType); ok {
listVal = ot.List
} else {
return nil, fmt.Errorf("proxy: should be an object")
}
// Parse the proxy
uo := listVal.Filter("upstreams")
proxy.Upstreams = make([]*api.ConsulUpstream, len(uo.Items))
for i := range uo.Items {
u, err := parseUpstream(uo.Items[i])
if err != nil {
return nil, err
}
proxy.Upstreams[i] = u
}
return &proxy, nil
}
func parseUpstream(uo *ast.ObjectItem) (*api.ConsulUpstream, error) {
valid := []string{
"destination_name",
"local_bind_port",
}
if err := helper.CheckHCLKeys(uo.Val, valid); err != nil {
return nil, multierror.Prefix(err, "upstream ->")
}
var upstream api.ConsulUpstream
var m map[string]interface{}
if err := hcl.DecodeObject(&m, uo.Val); err != nil {
return nil, err
}
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: &upstream,
})
if err != nil {
return nil, err
}
if err := dec.Decode(m); err != nil {
return nil, err
}
return &upstream, nil
}
func parseResources(result *api.Resources, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) == 0 {

View File

@@ -893,6 +893,26 @@ func TestParse(t *testing.T) {
},
},
},
Services: []*api.Service{
{
Name: "connect-service",
Tags: []string{"foo", "bar"},
CanaryTags: []string{"canary", "bam"},
PortLabel: "1234",
Connect: &api.ConsulConnect{
SidecarService: &api.ConsulSidecarService{
Proxy: &api.ConsulProxy{
Upstreams: []*api.ConsulUpstream{
{
DestinationName: "other-service",
LocalBindPort: 4567,
},
},
},
},
},
},
},
Tasks: []*api.Task{
{
Name: "bar",

View File

@@ -9,6 +9,25 @@ job "foo" {
to = 8080
}
}
service {
name = "connect-service"
tags = ["foo", "bar"]
canary_tags = ["canary", "bam"]
port = "1234"
connect {
sidecar_service {
proxy {
upstreams {
destination_name = "other-service"
local_bind_port = 4567
}
}
}
}
}
task "bar" {
driver = "raw_exec"
config {

34
jobspec/utils.go Normal file
View File

@@ -0,0 +1,34 @@
package jobspec
// flattenMapSlice flattens any occurrences of []map[string]interface{} into
// map[string]interface{}.
func flattenMapSlice(m map[string]interface{}) map[string]interface{} {
newM := make(map[string]interface{}, len(m))
for k, v := range m {
var newV interface{}
switch mapV := v.(type) {
case []map[string]interface{}:
// Recurse into each map and flatten values
newMap := map[string]interface{}{}
for _, innerM := range mapV {
for innerK, innerV := range flattenMapSlice(innerM) {
newMap[innerK] = innerV
}
}
newV = newMap
case map[string]interface{}:
// Recursively flatten maps
newV = flattenMapSlice(mapV)
default:
newV = v
}
newM[k] = newV
}
return newM
}

41
jobspec/utils_test.go Normal file
View File

@@ -0,0 +1,41 @@
package jobspec
import (
"testing"
"github.com/stretchr/testify/require"
)
// TestFlattenMapSlice asserts flattenMapSlice recursively flattens a slice of maps into a
// single map.
func TestFlattenMapSlice(t *testing.T) {
t.Parallel()
input := map[string]interface{}{
"foo": 123,
"bar": []map[string]interface{}{
{
"baz": 456,
},
{
"baz": 789,
},
{
"baax": true,
},
},
"nil": nil,
}
output := map[string]interface{}{
"foo": 123,
"bar": map[string]interface{}{
"baz": 789,
"baax": true,
},
"nil": nil,
}
require.Equal(t, output, flattenMapSlice(input))
}

View File

@@ -1,109 +0,0 @@
package structs
type ConsulConnect struct {
SidecarService *ConsulSidecarService
}
func (c *ConsulConnect) Copy() *ConsulConnect {
return &ConsulConnect{
SidecarService: c.SidecarService.Copy(),
}
}
func (c *ConsulConnect) Equals(o *ConsulConnect) bool {
if c == nil || o == nil {
return c == o
}
return c.SidecarService.Equals(o.SidecarService)
}
func (c *ConsulConnect) HasSidecar() bool {
return c != nil && c.SidecarService != nil
}
type ConsulSidecarService struct {
Port string
Proxy *ConsulProxy
}
func (s *ConsulSidecarService) Copy() *ConsulSidecarService {
return &ConsulSidecarService{
Port: s.Port,
Proxy: s.Proxy.Copy(),
}
}
func (s *ConsulSidecarService) Equals(o *ConsulSidecarService) bool {
if s == nil || o == nil {
return s == o
}
if s.Port != o.Port {
return false
}
return s.Proxy.Equals(o.Proxy)
}
type ConsulProxy struct {
Upstreams []*ConsulUpstream
}
func (p *ConsulProxy) Copy() *ConsulProxy {
upstreams := make([]*ConsulUpstream, len(p.Upstreams))
for i := range p.Upstreams {
upstreams[i] = p.Upstreams[i].Copy()
}
return &ConsulProxy{
Upstreams: upstreams,
}
}
func (p *ConsulProxy) Equals(o *ConsulProxy) bool {
if p == nil || o == nil {
return p == o
}
if len(p.Upstreams) != len(o.Upstreams) {
return false
}
// Order doesn't matter
OUTER:
for _, up := range p.Upstreams {
for _, innerUp := range o.Upstreams {
if up.Equals(innerUp) {
// Match; find next upstream
continue OUTER
}
}
// No match
return false
}
return true
}
type ConsulUpstream struct {
DestinationName string
LocalBindPort int
}
func (u *ConsulUpstream) Copy() *ConsulUpstream {
return &ConsulUpstream{
DestinationName: u.DestinationName,
LocalBindPort: u.LocalBindPort,
}
}
func (u *ConsulUpstream) Equals(o *ConsulUpstream) bool {
if u == nil || o == nil {
return u == o
}
return (*u) == (*o)
}

695
nomad/structs/services.go Normal file
View File

@@ -0,0 +1,695 @@
package structs
import (
"crypto/sha1"
"fmt"
"io"
"net/url"
"reflect"
"regexp"
"sort"
"strings"
"time"
"github.com/hashicorp/consul/api"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/args"
)
const (
ServiceCheckHTTP = "http"
ServiceCheckTCP = "tcp"
ServiceCheckScript = "script"
ServiceCheckGRPC = "grpc"
// minCheckInterval is the minimum check interval permitted. Consul
// currently has its MinInterval set to 1s. Mirror that here for
// consistency.
minCheckInterval = 1 * time.Second
// minCheckTimeout is the minimum check timeout permitted for Consul
// script TTL checks.
minCheckTimeout = 1 * time.Second
)
// ServiceCheck represents the Consul health check.
type ServiceCheck struct {
Name string // Name of the check, defaults to id
Type string // Type of the check - tcp, http, docker and script
Command string // Command is the command to run for script checks
Args []string // Args is a list of arguments for script checks
Path string // path of the health check url for http type check
Protocol string // Protocol to use if check is http, defaults to http
PortLabel string // The port to use for tcp/http checks
AddressMode string // 'host' to use host ip:port or 'driver' to use driver's
Interval time.Duration // Interval of the check
Timeout time.Duration // Timeout of the response from the check before consul fails the check
InitialStatus string // Initial status of the check
TLSSkipVerify bool // Skip TLS verification when Protocol=https
Method string // HTTP Method to use (GET by default)
Header map[string][]string // HTTP Headers for Consul to set when making HTTP checks
CheckRestart *CheckRestart // If and when a task should be restarted based on checks
GRPCService string // Service for GRPC checks
GRPCUseTLS bool // Whether or not to use TLS for GRPC checks
}
// Copy the stanza recursively. Returns nil if nil.
func (sc *ServiceCheck) Copy() *ServiceCheck {
if sc == nil {
return nil
}
nsc := new(ServiceCheck)
*nsc = *sc
nsc.Args = helper.CopySliceString(sc.Args)
nsc.Header = helper.CopyMapStringSliceString(sc.Header)
nsc.CheckRestart = sc.CheckRestart.Copy()
return nsc
}
// Equals returns true if the structs are recursively equal.
func (sc *ServiceCheck) Equals(o *ServiceCheck) bool {
if sc == nil || o == nil {
return sc == o
}
if sc.Name != o.Name {
return false
}
if sc.AddressMode != o.AddressMode {
return false
}
if !helper.CompareSliceSetString(sc.Args, o.Args) {
return false
}
if !sc.CheckRestart.Equals(o.CheckRestart) {
return false
}
if sc.Command != o.Command {
return false
}
if sc.GRPCService != o.GRPCService {
return false
}
if sc.GRPCUseTLS != o.GRPCUseTLS {
return false
}
// Use DeepEqual here as order of slice values could matter
if !reflect.DeepEqual(sc.Header, o.Header) {
return false
}
if sc.InitialStatus != o.InitialStatus {
return false
}
if sc.Interval != o.Interval {
return false
}
if sc.Method != o.Method {
return false
}
if sc.Path != o.Path {
return false
}
if sc.PortLabel != o.Path {
return false
}
if sc.Protocol != o.Protocol {
return false
}
if sc.TLSSkipVerify != o.TLSSkipVerify {
return false
}
if sc.Timeout != o.Timeout {
return false
}
if sc.Type != o.Type {
return false
}
return true
}
func (sc *ServiceCheck) Canonicalize(serviceName string) {
// Ensure empty maps/slices are treated as null to avoid scheduling
// issues when using DeepEquals.
if len(sc.Args) == 0 {
sc.Args = nil
}
if len(sc.Header) == 0 {
sc.Header = nil
} else {
for k, v := range sc.Header {
if len(v) == 0 {
sc.Header[k] = nil
}
}
}
if sc.Name == "" {
sc.Name = fmt.Sprintf("service: %q check", serviceName)
}
}
// validate a Service's ServiceCheck
func (sc *ServiceCheck) validate() error {
// Validate Type
switch strings.ToLower(sc.Type) {
case ServiceCheckGRPC:
case ServiceCheckTCP:
case ServiceCheckHTTP:
if sc.Path == "" {
return fmt.Errorf("http type must have a valid http path")
}
url, err := url.Parse(sc.Path)
if err != nil {
return fmt.Errorf("http type must have a valid http path")
}
if url.IsAbs() {
return fmt.Errorf("http type must have a relative http path")
}
case ServiceCheckScript:
if sc.Command == "" {
return fmt.Errorf("script type must have a valid script path")
}
default:
return fmt.Errorf(`invalid type (%+q), must be one of "http", "tcp", or "script" type`, sc.Type)
}
// Validate interval and timeout
if sc.Interval == 0 {
return fmt.Errorf("missing required value interval. Interval cannot be less than %v", minCheckInterval)
} else if sc.Interval < minCheckInterval {
return fmt.Errorf("interval (%v) cannot be lower than %v", sc.Interval, minCheckInterval)
}
if sc.Timeout == 0 {
return fmt.Errorf("missing required value timeout. Timeout cannot be less than %v", minCheckInterval)
} else if sc.Timeout < minCheckTimeout {
return fmt.Errorf("timeout (%v) is lower than required minimum timeout %v", sc.Timeout, minCheckInterval)
}
// Validate InitialStatus
switch sc.InitialStatus {
case "":
case api.HealthPassing:
case api.HealthWarning:
case api.HealthCritical:
default:
return fmt.Errorf(`invalid initial check state (%s), must be one of %q, %q, %q or empty`, sc.InitialStatus, api.HealthPassing, api.HealthWarning, api.HealthCritical)
}
// Validate AddressMode
switch sc.AddressMode {
case "", AddressModeHost, AddressModeDriver:
// Ok
case AddressModeAuto:
return fmt.Errorf("invalid address_mode %q - %s only valid for services", sc.AddressMode, AddressModeAuto)
default:
return fmt.Errorf("invalid address_mode %q", sc.AddressMode)
}
return sc.CheckRestart.Validate()
}
// RequiresPort returns whether the service check requires the task has a port.
func (sc *ServiceCheck) RequiresPort() bool {
switch sc.Type {
case ServiceCheckGRPC, ServiceCheckHTTP, ServiceCheckTCP:
return true
default:
return false
}
}
// TriggersRestarts returns true if this check should be watched and trigger a restart
// on failure.
func (sc *ServiceCheck) TriggersRestarts() bool {
return sc.CheckRestart != nil && sc.CheckRestart.Limit > 0
}
// Hash all ServiceCheck fields and the check's corresponding service ID to
// create an identifier. The identifier is not guaranteed to be unique as if
// the PortLabel is blank, the Service's PortLabel will be used after Hash is
// called.
func (sc *ServiceCheck) Hash(serviceID string) string {
h := sha1.New()
io.WriteString(h, serviceID)
io.WriteString(h, sc.Name)
io.WriteString(h, sc.Type)
io.WriteString(h, sc.Command)
io.WriteString(h, strings.Join(sc.Args, ""))
io.WriteString(h, sc.Path)
io.WriteString(h, sc.Protocol)
io.WriteString(h, sc.PortLabel)
io.WriteString(h, sc.Interval.String())
io.WriteString(h, sc.Timeout.String())
io.WriteString(h, sc.Method)
// Only include TLSSkipVerify if set to maintain ID stability with Nomad <0.6
if sc.TLSSkipVerify {
io.WriteString(h, "true")
}
// Since map iteration order isn't stable we need to write k/v pairs to
// a slice and sort it before hashing.
if len(sc.Header) > 0 {
headers := make([]string, 0, len(sc.Header))
for k, v := range sc.Header {
headers = append(headers, k+strings.Join(v, ""))
}
sort.Strings(headers)
io.WriteString(h, strings.Join(headers, ""))
}
// Only include AddressMode if set to maintain ID stability with Nomad <0.7.1
if len(sc.AddressMode) > 0 {
io.WriteString(h, sc.AddressMode)
}
// Only include GRPC if set to maintain ID stability with Nomad <0.8.4
if sc.GRPCService != "" {
io.WriteString(h, sc.GRPCService)
}
if sc.GRPCUseTLS {
io.WriteString(h, "true")
}
return fmt.Sprintf("%x", h.Sum(nil))
}
const (
AddressModeAuto = "auto"
AddressModeHost = "host"
AddressModeDriver = "driver"
)
// Service represents a Consul service definition
type Service struct {
// Name of the service registered with Consul. Consul defaults the
// Name to ServiceID if not specified. The Name if specified is used
// as one of the seed values when generating a Consul ServiceID.
Name string
// PortLabel is either the numeric port number or the `host:port`.
// To specify the port number using the host's Consul Advertise
// address, specify an empty host in the PortLabel (e.g. `:port`).
PortLabel string
// AddressMode specifies whether or not to use the host ip:port for
// this service.
AddressMode string
Tags []string // List of tags for the service
CanaryTags []string // List of tags for the service when it is a canary
Checks []*ServiceCheck // List of checks associated with the service
Connect *ConsulConnect // Consul Connect configuration
}
// Copy the stanza recursively. Returns nil if nil.
func (s *Service) Copy() *Service {
if s == nil {
return nil
}
ns := new(Service)
*ns = *s
ns.Tags = helper.CopySliceString(ns.Tags)
ns.CanaryTags = helper.CopySliceString(ns.CanaryTags)
if s.Checks != nil {
checks := make([]*ServiceCheck, len(ns.Checks))
for i, c := range ns.Checks {
checks[i] = c.Copy()
}
ns.Checks = checks
}
ns.Connect = s.Connect.Copy()
return ns
}
// Canonicalize interpolates values of Job, Task Group and Task in the Service
// Name. This also generates check names, service id and check ids.
func (s *Service) Canonicalize(job string, taskGroup string, task string) {
// Ensure empty lists are treated as null to avoid scheduler issues when
// using DeepEquals
if len(s.Tags) == 0 {
s.Tags = nil
}
if len(s.CanaryTags) == 0 {
s.CanaryTags = nil
}
if len(s.Checks) == 0 {
s.Checks = nil
}
s.Name = args.ReplaceEnv(s.Name, map[string]string{
"JOB": job,
"TASKGROUP": taskGroup,
"TASK": task,
"BASE": fmt.Sprintf("%s-%s-%s", job, taskGroup, task),
},
)
for _, check := range s.Checks {
check.Canonicalize(s.Name)
}
}
// Validate checks if the Check definition is valid
func (s *Service) Validate() error {
var mErr multierror.Error
// Ensure the service name is valid per the below RFCs but make an exception
// for our interpolation syntax by first stripping any environment variables from the name
serviceNameStripped := args.ReplaceEnvWithPlaceHolder(s.Name, "ENV-VAR")
if err := s.ValidateName(serviceNameStripped); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("service name must be valid per RFC 1123 and can contain only alphanumeric characters or dashes: %q", s.Name))
}
switch s.AddressMode {
case "", AddressModeAuto, AddressModeHost, AddressModeDriver:
// OK
default:
mErr.Errors = append(mErr.Errors, fmt.Errorf("service address_mode must be %q, %q, or %q; not %q", AddressModeAuto, AddressModeHost, AddressModeDriver, s.AddressMode))
}
for _, c := range s.Checks {
if s.PortLabel == "" && c.PortLabel == "" && c.RequiresPort() {
mErr.Errors = append(mErr.Errors, fmt.Errorf("check %s invalid: check requires a port but neither check nor service %+q have a port", c.Name, s.Name))
continue
}
if err := c.validate(); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("check %s invalid: %v", c.Name, err))
}
}
if s.Connect != nil {
if err := s.Connect.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil()
}
// ValidateName checks if the services Name is valid and should be called after
// the name has been interpolated
func (s *Service) ValidateName(name string) error {
// Ensure the service name is valid per RFC-952 §1
// (https://tools.ietf.org/html/rfc952), RFC-1123 §2.1
// (https://tools.ietf.org/html/rfc1123), and RFC-2782
// (https://tools.ietf.org/html/rfc2782).
re := regexp.MustCompile(`^(?i:[a-z0-9]|[a-z0-9][a-z0-9\-]{0,61}[a-z0-9])$`)
if !re.MatchString(name) {
return fmt.Errorf("service name must be valid per RFC 1123 and can contain only alphanumeric characters or dashes and must be no longer than 63 characters: %q", name)
}
return nil
}
// Hash returns a base32 encoded hash of a Service's contents excluding checks
// as they're hashed independently.
func (s *Service) Hash(allocID, taskName string, canary bool) string {
h := sha1.New()
io.WriteString(h, allocID)
io.WriteString(h, taskName)
io.WriteString(h, s.Name)
io.WriteString(h, s.PortLabel)
io.WriteString(h, s.AddressMode)
for _, tag := range s.Tags {
io.WriteString(h, tag)
}
for _, tag := range s.CanaryTags {
io.WriteString(h, tag)
}
// Vary ID on whether or not CanaryTags will be used
if canary {
h.Write([]byte("Canary"))
}
// Base32 is used for encoding the hash as sha1 hashes can always be
// encoded without padding, only 4 bytes larger than base64, and saves
// 8 bytes vs hex. Since these hashes are used in Consul URLs it's nice
// to have a reasonably compact URL-safe representation.
return b32.EncodeToString(h.Sum(nil))
}
// Equals returns true if the structs are recursively equal.
func (s *Service) Equals(o *Service) bool {
if s == nil || o == nil {
return s == o
}
if s.AddressMode != o.AddressMode {
return false
}
if !helper.CompareSliceSetString(s.CanaryTags, o.CanaryTags) {
return false
}
if len(s.Checks) != len(o.Checks) {
return false
}
OUTER:
for i := range s.Checks {
for ii := range o.Checks {
if s.Checks[i].Equals(o.Checks[ii]) {
// Found match; continue with next check
continue OUTER
}
}
// No match
return false
}
if !s.Connect.Equals(o.Connect) {
return false
}
if s.Name != o.Name {
return false
}
if s.PortLabel != o.PortLabel {
return false
}
if !helper.CompareSliceSetString(s.Tags, o.Tags) {
return false
}
return true
}
// ConsulConnect represents a Consul Connect jobspec stanza.
type ConsulConnect struct {
// Native is true if a service implements Connect directly and does not
// need a sidecar.
Native bool
// SidecarService is non-nil if a service requires a sidecar.
SidecarService *ConsulSidecarService
}
// Copy the stanza recursively. Returns nil if nil.
func (c *ConsulConnect) Copy() *ConsulConnect {
if c == nil {
return nil
}
return &ConsulConnect{
Native: c.Native,
SidecarService: c.SidecarService.Copy(),
}
}
// Equals returns true if the structs are recursively equal.
func (c *ConsulConnect) Equals(o *ConsulConnect) bool {
if c == nil || o == nil {
return c == o
}
if c.Native != o.Native {
return false
}
return c.SidecarService.Equals(o.SidecarService)
}
// Validate that the Connect stanza has exactly one of Native or sidecar.
func (c *ConsulConnect) Validate() error {
if c == nil {
return nil
}
if c.Native && c.SidecarService != nil {
return fmt.Errorf("Consul Connect must be native or use a sidecar service; not both")
}
if !c.Native && c.SidecarService == nil {
return fmt.Errorf("Consul Connect must be native or use a sidecar service")
}
return nil
}
// ConsulSidecarService represents a Consul Connect SidecarService jobspec
// stanza.
type ConsulSidecarService struct {
// Port is the service's port that the sidecar will connect to. May be
// a port label or a literal port number.
Port string
// Proxy stanza defining the sidecar proxy configuration.
Proxy *ConsulProxy
}
// Copy the stanza recursively. Returns nil if nil.
func (s *ConsulSidecarService) Copy() *ConsulSidecarService {
return &ConsulSidecarService{
Port: s.Port,
Proxy: s.Proxy.Copy(),
}
}
// Equals returns true if the structs are recursively equal.
func (s *ConsulSidecarService) Equals(o *ConsulSidecarService) bool {
if s == nil || o == nil {
return s == o
}
if s.Port != o.Port {
return false
}
return s.Proxy.Equals(o.Proxy)
}
// ConsulProxy represents a Consul Connect sidecar proxy jobspec stanza.
type ConsulProxy struct {
// Upstreams configures the upstream services this service intends to
// connect to.
Upstreams []*ConsulUpstream
// Config is a proxy configuration. It is opaque to Nomad and passed
// directly to Consul.
Config map[string]interface{}
}
// Copy the stanza recursively. Returns nil if nil.
func (p *ConsulProxy) Copy() *ConsulProxy {
if p == nil {
return nil
}
newP := ConsulProxy{}
if n := len(p.Upstreams); n > 0 {
newP.Upstreams = make([]*ConsulUpstream, n)
for i := range p.Upstreams {
newP.Upstreams[i] = p.Upstreams[i].Copy()
}
}
if n := len(p.Config); n > 0 {
newP.Config = make(map[string]interface{}, n)
for k, v := range p.Config {
newP.Config[k] = v
}
}
return &newP
}
// Equals returns true if the structs are recursively equal.
func (p *ConsulProxy) Equals(o *ConsulProxy) bool {
if p == nil || o == nil {
return p == o
}
if len(p.Upstreams) != len(o.Upstreams) {
return false
}
// Order doesn't matter
OUTER:
for _, up := range p.Upstreams {
for _, innerUp := range o.Upstreams {
if up.Equals(innerUp) {
// Match; find next upstream
continue OUTER
}
}
// No match
return false
}
// Avoid nil vs {} differences
if len(p.Config) != 0 && len(o.Config) != 0 {
if !reflect.DeepEqual(p.Config, o.Config) {
return false
}
}
return true
}
// ConsulUpstream represents a Consul Connect upstream jobspec stanza.
type ConsulUpstream struct {
// DestinationName is the name of the upstream service.
DestinationName string
// LocalBindPort is the port the proxy will receive connections for the
// upstream on.
LocalBindPort int
}
// Copy the stanza recursively. Returns nil if nil.
func (u *ConsulUpstream) Copy() *ConsulUpstream {
if u == nil {
return nil
}
return &ConsulUpstream{
DestinationName: u.DestinationName,
LocalBindPort: u.LocalBindPort,
}
}
// Equals returns true if the structs are recursively equal.
func (u *ConsulUpstream) Equals(o *ConsulUpstream) bool {
if u == nil || o == nil {
return u == o
}
return (*u) == (*o)
}

View File

@@ -0,0 +1,62 @@
package structs
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestConsulConnect_Validate(t *testing.T) {
t.Parallel()
c := &ConsulConnect{}
// An empty Connect stanza is invalid
require.Error(t, c.Validate())
// Native=true is valid
c.Native = true
require.NoError(t, c.Validate())
// Native=true + Sidecar!=nil is invalid
c.SidecarService = &ConsulSidecarService{}
require.Error(t, c.Validate())
// Native=false + Sidecar!=nil is valid
c.Native = false
require.NoError(t, c.Validate())
}
func TestConsulConnect_CopyEquals(t *testing.T) {
t.Parallel()
c := &ConsulConnect{
SidecarService: &ConsulSidecarService{
Port: "9001",
Proxy: &ConsulProxy{
Upstreams: []*ConsulUpstream{
{
DestinationName: "up1",
LocalBindPort: 9002,
},
{
DestinationName: "up2",
LocalBindPort: 9003,
},
},
Config: map[string]interface{}{
"foo": 1,
},
},
},
}
require.NoError(t, c.Validate())
// Copies should be equivalent
o := c.Copy()
require.True(t, c.Equals(o))
o.SidecarService.Proxy.Upstreams = nil
require.False(t, c.Equals(o))
}

View File

@@ -12,10 +12,8 @@ import (
"encoding/hex"
"errors"
"fmt"
"io"
"math"
"net"
"net/url"
"os"
"path/filepath"
"reflect"
@@ -26,7 +24,6 @@ import (
"time"
"github.com/gorhill/cronexpr"
"github.com/hashicorp/consul/api"
hcodec "github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-version"
@@ -5039,485 +5036,6 @@ func (c *CheckRestart) Validate() error {
return mErr.ErrorOrNil()
}
const (
ServiceCheckHTTP = "http"
ServiceCheckTCP = "tcp"
ServiceCheckScript = "script"
ServiceCheckGRPC = "grpc"
// minCheckInterval is the minimum check interval permitted. Consul
// currently has its MinInterval set to 1s. Mirror that here for
// consistency.
minCheckInterval = 1 * time.Second
// minCheckTimeout is the minimum check timeout permitted for Consul
// script TTL checks.
minCheckTimeout = 1 * time.Second
)
// The ServiceCheck data model represents the consul health check that
// Nomad registers for a Task
type ServiceCheck struct {
Name string // Name of the check, defaults to id
Type string // Type of the check - tcp, http, docker and script
Command string // Command is the command to run for script checks
Args []string // Args is a list of arguments for script checks
Path string // path of the health check url for http type check
Protocol string // Protocol to use if check is http, defaults to http
PortLabel string // The port to use for tcp/http checks
AddressMode string // 'host' to use host ip:port or 'driver' to use driver's
Interval time.Duration // Interval of the check
Timeout time.Duration // Timeout of the response from the check before consul fails the check
InitialStatus string // Initial status of the check
TLSSkipVerify bool // Skip TLS verification when Protocol=https
Method string // HTTP Method to use (GET by default)
Header map[string][]string // HTTP Headers for Consul to set when making HTTP checks
CheckRestart *CheckRestart // If and when a task should be restarted based on checks
GRPCService string // Service for GRPC checks
GRPCUseTLS bool // Whether or not to use TLS for GRPC checks
}
func (sc *ServiceCheck) Copy() *ServiceCheck {
if sc == nil {
return nil
}
nsc := new(ServiceCheck)
*nsc = *sc
nsc.Args = helper.CopySliceString(sc.Args)
nsc.Header = helper.CopyMapStringSliceString(sc.Header)
nsc.CheckRestart = sc.CheckRestart.Copy()
return nsc
}
func (sc *ServiceCheck) Equals(o *ServiceCheck) bool {
if sc == nil || o == nil {
return sc == o
}
if sc.Name != o.Name {
return false
}
if sc.AddressMode != o.AddressMode {
return false
}
if !helper.CompareSliceSetString(sc.Args, o.Args) {
return false
}
if !sc.CheckRestart.Equals(o.CheckRestart) {
return false
}
if sc.Command != o.Command {
return false
}
if sc.GRPCService != o.GRPCService {
return false
}
if sc.GRPCUseTLS != o.GRPCUseTLS {
return false
}
// Use DeepEqual here as order of slice values could matter
if !reflect.DeepEqual(sc.Header, o.Header) {
return false
}
if sc.InitialStatus != o.InitialStatus {
return false
}
if sc.Interval != o.Interval {
return false
}
if sc.Method != o.Method {
return false
}
if sc.Path != o.Path {
return false
}
if sc.PortLabel != o.Path {
return false
}
if sc.Protocol != o.Protocol {
return false
}
if sc.TLSSkipVerify != o.TLSSkipVerify {
return false
}
if sc.Timeout != o.Timeout {
return false
}
if sc.Type != o.Type {
return false
}
return true
}
func (sc *ServiceCheck) Canonicalize(serviceName string) {
// Ensure empty maps/slices are treated as null to avoid scheduling
// issues when using DeepEquals.
if len(sc.Args) == 0 {
sc.Args = nil
}
if len(sc.Header) == 0 {
sc.Header = nil
} else {
for k, v := range sc.Header {
if len(v) == 0 {
sc.Header[k] = nil
}
}
}
if sc.Name == "" {
sc.Name = fmt.Sprintf("service: %q check", serviceName)
}
}
// validate a Service's ServiceCheck
func (sc *ServiceCheck) validate() error {
// Validate Type
switch strings.ToLower(sc.Type) {
case ServiceCheckGRPC:
case ServiceCheckTCP:
case ServiceCheckHTTP:
if sc.Path == "" {
return fmt.Errorf("http type must have a valid http path")
}
url, err := url.Parse(sc.Path)
if err != nil {
return fmt.Errorf("http type must have a valid http path")
}
if url.IsAbs() {
return fmt.Errorf("http type must have a relative http path")
}
case ServiceCheckScript:
if sc.Command == "" {
return fmt.Errorf("script type must have a valid script path")
}
default:
return fmt.Errorf(`invalid type (%+q), must be one of "http", "tcp", or "script" type`, sc.Type)
}
// Validate interval and timeout
if sc.Interval == 0 {
return fmt.Errorf("missing required value interval. Interval cannot be less than %v", minCheckInterval)
} else if sc.Interval < minCheckInterval {
return fmt.Errorf("interval (%v) cannot be lower than %v", sc.Interval, minCheckInterval)
}
if sc.Timeout == 0 {
return fmt.Errorf("missing required value timeout. Timeout cannot be less than %v", minCheckInterval)
} else if sc.Timeout < minCheckTimeout {
return fmt.Errorf("timeout (%v) is lower than required minimum timeout %v", sc.Timeout, minCheckInterval)
}
// Validate InitialStatus
switch sc.InitialStatus {
case "":
case api.HealthPassing:
case api.HealthWarning:
case api.HealthCritical:
default:
return fmt.Errorf(`invalid initial check state (%s), must be one of %q, %q, %q or empty`, sc.InitialStatus, api.HealthPassing, api.HealthWarning, api.HealthCritical)
}
// Validate AddressMode
switch sc.AddressMode {
case "", AddressModeHost, AddressModeDriver:
// Ok
case AddressModeAuto:
return fmt.Errorf("invalid address_mode %q - %s only valid for services", sc.AddressMode, AddressModeAuto)
default:
return fmt.Errorf("invalid address_mode %q", sc.AddressMode)
}
return sc.CheckRestart.Validate()
}
// RequiresPort returns whether the service check requires the task has a port.
func (sc *ServiceCheck) RequiresPort() bool {
switch sc.Type {
case ServiceCheckGRPC, ServiceCheckHTTP, ServiceCheckTCP:
return true
default:
return false
}
}
// TriggersRestarts returns true if this check should be watched and trigger a restart
// on failure.
func (sc *ServiceCheck) TriggersRestarts() bool {
return sc.CheckRestart != nil && sc.CheckRestart.Limit > 0
}
// Hash all ServiceCheck fields and the check's corresponding service ID to
// create an identifier. The identifier is not guaranteed to be unique as if
// the PortLabel is blank, the Service's PortLabel will be used after Hash is
// called.
func (sc *ServiceCheck) Hash(serviceID string) string {
h := sha1.New()
io.WriteString(h, serviceID)
io.WriteString(h, sc.Name)
io.WriteString(h, sc.Type)
io.WriteString(h, sc.Command)
io.WriteString(h, strings.Join(sc.Args, ""))
io.WriteString(h, sc.Path)
io.WriteString(h, sc.Protocol)
io.WriteString(h, sc.PortLabel)
io.WriteString(h, sc.Interval.String())
io.WriteString(h, sc.Timeout.String())
io.WriteString(h, sc.Method)
// Only include TLSSkipVerify if set to maintain ID stability with Nomad <0.6
if sc.TLSSkipVerify {
io.WriteString(h, "true")
}
// Since map iteration order isn't stable we need to write k/v pairs to
// a slice and sort it before hashing.
if len(sc.Header) > 0 {
headers := make([]string, 0, len(sc.Header))
for k, v := range sc.Header {
headers = append(headers, k+strings.Join(v, ""))
}
sort.Strings(headers)
io.WriteString(h, strings.Join(headers, ""))
}
// Only include AddressMode if set to maintain ID stability with Nomad <0.7.1
if len(sc.AddressMode) > 0 {
io.WriteString(h, sc.AddressMode)
}
// Only include GRPC if set to maintain ID stability with Nomad <0.8.4
if sc.GRPCService != "" {
io.WriteString(h, sc.GRPCService)
}
if sc.GRPCUseTLS {
io.WriteString(h, "true")
}
return fmt.Sprintf("%x", h.Sum(nil))
}
const (
AddressModeAuto = "auto"
AddressModeHost = "host"
AddressModeDriver = "driver"
)
// Service represents a Consul service definition in Nomad
type Service struct {
// Name of the service registered with Consul. Consul defaults the
// Name to ServiceID if not specified. The Name if specified is used
// as one of the seed values when generating a Consul ServiceID.
Name string
// PortLabel is either the numeric port number or the `host:port`.
// To specify the port number using the host's Consul Advertise
// address, specify an empty host in the PortLabel (e.g. `:port`).
PortLabel string
// AddressMode specifies whether or not to use the host ip:port for
// this service.
AddressMode string
Tags []string // List of tags for the service
CanaryTags []string // List of tags for the service when it is a canary
Checks []*ServiceCheck // List of checks associated with the service
Connect *ConsulConnect // Consul Connect configuration
}
func (s *Service) Copy() *Service {
if s == nil {
return nil
}
ns := new(Service)
*ns = *s
ns.Tags = helper.CopySliceString(ns.Tags)
ns.CanaryTags = helper.CopySliceString(ns.CanaryTags)
if s.Checks != nil {
checks := make([]*ServiceCheck, len(ns.Checks))
for i, c := range ns.Checks {
checks[i] = c.Copy()
}
ns.Checks = checks
}
return ns
}
// Canonicalize interpolates values of Job, Task Group and Task in the Service
// Name. This also generates check names, service id and check ids.
func (s *Service) Canonicalize(job string, taskGroup string, task string) {
// Ensure empty lists are treated as null to avoid scheduler issues when
// using DeepEquals
if len(s.Tags) == 0 {
s.Tags = nil
}
if len(s.CanaryTags) == 0 {
s.CanaryTags = nil
}
if len(s.Checks) == 0 {
s.Checks = nil
}
s.Name = args.ReplaceEnv(s.Name, map[string]string{
"JOB": job,
"TASKGROUP": taskGroup,
"TASK": task,
"BASE": fmt.Sprintf("%s-%s-%s", job, taskGroup, task),
},
)
for _, check := range s.Checks {
check.Canonicalize(s.Name)
}
}
// Validate checks if the Check definition is valid
func (s *Service) Validate() error {
var mErr multierror.Error
// Ensure the service name is valid per the below RFCs but make an exception
// for our interpolation syntax by first stripping any environment variables from the name
serviceNameStripped := args.ReplaceEnvWithPlaceHolder(s.Name, "ENV-VAR")
if err := s.ValidateName(serviceNameStripped); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("service name must be valid per RFC 1123 and can contain only alphanumeric characters or dashes: %q", s.Name))
}
switch s.AddressMode {
case "", AddressModeAuto, AddressModeHost, AddressModeDriver:
// OK
default:
mErr.Errors = append(mErr.Errors, fmt.Errorf("service address_mode must be %q, %q, or %q; not %q", AddressModeAuto, AddressModeHost, AddressModeDriver, s.AddressMode))
}
for _, c := range s.Checks {
if s.PortLabel == "" && c.PortLabel == "" && c.RequiresPort() {
mErr.Errors = append(mErr.Errors, fmt.Errorf("check %s invalid: check requires a port but neither check nor service %+q have a port", c.Name, s.Name))
continue
}
if err := c.validate(); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("check %s invalid: %v", c.Name, err))
}
}
return mErr.ErrorOrNil()
}
// ValidateName checks if the services Name is valid and should be called after
// the name has been interpolated
func (s *Service) ValidateName(name string) error {
// Ensure the service name is valid per RFC-952 §1
// (https://tools.ietf.org/html/rfc952), RFC-1123 §2.1
// (https://tools.ietf.org/html/rfc1123), and RFC-2782
// (https://tools.ietf.org/html/rfc2782).
re := regexp.MustCompile(`^(?i:[a-z0-9]|[a-z0-9][a-z0-9\-]{0,61}[a-z0-9])$`)
if !re.MatchString(name) {
return fmt.Errorf("service name must be valid per RFC 1123 and can contain only alphanumeric characters or dashes and must be no longer than 63 characters: %q", name)
}
return nil
}
// Hash returns a base32 encoded hash of a Service's contents excluding checks
// as they're hashed independently.
func (s *Service) Hash(allocID, taskName string, canary bool) string {
h := sha1.New()
io.WriteString(h, allocID)
io.WriteString(h, taskName)
io.WriteString(h, s.Name)
io.WriteString(h, s.PortLabel)
io.WriteString(h, s.AddressMode)
for _, tag := range s.Tags {
io.WriteString(h, tag)
}
for _, tag := range s.CanaryTags {
io.WriteString(h, tag)
}
// Vary ID on whether or not CanaryTags will be used
if canary {
h.Write([]byte("Canary"))
}
// Base32 is used for encoding the hash as sha1 hashes can always be
// encoded without padding, only 4 bytes larger than base64, and saves
// 8 bytes vs hex. Since these hashes are used in Consul URLs it's nice
// to have a reasonably compact URL-safe representation.
return b32.EncodeToString(h.Sum(nil))
}
func (s *Service) Equals(o *Service) bool {
if s == nil || o == nil {
return s == o
}
if s.AddressMode != o.AddressMode {
return false
}
if !helper.CompareSliceSetString(s.CanaryTags, o.CanaryTags) {
return false
}
if len(s.Checks) != len(o.Checks) {
return false
}
OUTER:
for i := range s.Checks {
for ii := range o.Checks {
if s.Checks[i].Equals(o.Checks[ii]) {
// Found match; continue with next check
continue OUTER
}
}
// No match
return false
}
if !s.Connect.Equals(o.Connect) {
return false
}
if s.Name != o.Name {
return false
}
if s.PortLabel != o.PortLabel {
return false
}
if !helper.CompareSliceSetString(s.Tags, o.Tags) {
return false
}
return true
}
const (
// DefaultKillTimeout is the default timeout between signaling a task it
// will be killed and killing it.

View File

@@ -2235,6 +2235,69 @@ func TestService_Canonicalize(t *testing.T) {
}
func TestService_Validate(t *testing.T) {
s := Service{
Name: "testservice",
}
s.Canonicalize("testjob", "testgroup", "testtask")
// Base service should be valid
require.NoError(t, s.Validate())
// Native Connect should be valid
s.Connect = &ConsulConnect{
Native: true,
}
require.NoError(t, s.Validate())
// Native Connect + Sidecar should be invalid
s.Connect.SidecarService = &ConsulSidecarService{}
require.Error(t, s.Validate())
}
func TestService_Equals(t *testing.T) {
s := Service{
Name: "testservice",
}
s.Canonicalize("testjob", "testgroup", "testtask")
o := s.Copy()
// Base service should be equal to copy of itself
require.True(t, s.Equals(o))
// create a helper to assert a diff and reset the struct
assertDiff := func() {
require.False(t, s.Equals(o))
o = s.Copy()
require.True(t, s.Equals(o), "bug in copy")
}
// Changing any field should cause inequality
o.Name = "diff"
assertDiff()
o.PortLabel = "diff"
assertDiff()
o.AddressMode = AddressModeDriver
assertDiff()
o.Tags = []string{"diff"}
assertDiff()
o.CanaryTags = []string{"diff"}
assertDiff()
o.Checks = []*ServiceCheck{{Name: "diff"}}
assertDiff()
o.Connect = &ConsulConnect{Native: true}
assertDiff()
}
func TestJob_ExpandServiceNames(t *testing.T) {
j := &Job{
Name: "my-job",