Merge pull request #13869 from hashicorp/b-uniq-services-2

servicedisco: ensure service uniqueness in job validation
This commit is contained in:
Seth Hoenig
2022-07-21 08:24:24 -05:00
committed by GitHub
2 changed files with 69 additions and 18 deletions

3
.changelog/13869.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:bug
servicedisco: Fixed a bug where non-unique services would escape job validation
```

View File

@@ -25,12 +25,10 @@ import (
"time"
jwt "github.com/golang-jwt/jwt/v4"
"github.com/hashicorp/nomad/helper/escapingfs"
"golang.org/x/crypto/blake2b"
"github.com/hashicorp/cronexpr"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-set"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/command/agent/host"
@@ -38,12 +36,14 @@ import (
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/args"
"github.com/hashicorp/nomad/helper/constraints/semver"
"github.com/hashicorp/nomad/helper/escapingfs"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/lib/cpuset"
"github.com/hashicorp/nomad/lib/kheap"
psstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"github.com/miekg/dns"
"github.com/mitchellh/copystructure"
"golang.org/x/crypto/blake2b"
)
var (
@@ -6522,6 +6522,7 @@ func (tg *TaskGroup) Validate(j *Job) error {
mErr.Errors = append(mErr.Errors, outer)
}
}
return mErr.ErrorOrNil()
}
@@ -6623,41 +6624,69 @@ func (tg *TaskGroup) validateNetworks() error {
// group service checks that refer to tasks only refer to tasks that exist.
func (tg *TaskGroup) validateServices() error {
var mErr multierror.Error
knownTasks := make(map[string]struct{})
// Track the providers used for this task group. Currently, Nomad only
// Accumulate task names in this group
taskSet := set.New[string](len(tg.Tasks))
// each service in a group must be unique (i.e. used in MakeAllocServiceID)
type unique struct {
name string
task string
port string
}
// Accumulate service IDs in this group
idSet := set.New[unique](0)
// Accumulate IDs that are duplicates
idDuplicateSet := set.New[unique](0)
// Accumulate the providers used for this task group. Currently, Nomad only
// allows the use of a single service provider within a task group.
configuredProviders := make(map[string]struct{})
providerSet := set.New[string](1)
// Create a map of known tasks and their services so we can compare
// vs the group-level services and checks
for _, task := range tg.Tasks {
knownTasks[task.Name] = struct{}{}
if task.Services == nil {
taskSet.Insert(task.Name)
if len(task.Services) == 0 {
continue
}
for _, service := range task.Services {
// Ensure no task-level checks specify a task
for _, check := range service.Checks {
if check.TaskName != "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Check %s is invalid: only task group service checks can be assigned tasks", check.Name))
}
}
// Add the service provider to the tracking, if it has not already
// been seen.
if _, ok := configuredProviders[service.Provider]; !ok {
configuredProviders[service.Provider] = struct{}{}
// Track that we have seen this service id
id := unique{service.Name, task.Name, service.PortLabel}
if !idSet.Insert(id) {
// accumulate duplicates for a single error later on
idDuplicateSet.Insert(id)
}
// Track that we have seen this service provider
providerSet.Insert(service.Provider)
}
}
for i, service := range tg.Services {
// Add the service provider to the tracking, if it has not already been
// seen.
if _, ok := configuredProviders[service.Provider]; !ok {
configuredProviders[service.Provider] = struct{}{}
// Track that we have seen this service id
id := unique{service.Name, "group", service.PortLabel}
if !idSet.Insert(id) {
// accumulate duplicates for a single error later on
idDuplicateSet.Insert(id)
}
// Track that we have seen this service provider
providerSet.Insert(service.Provider)
if err := service.Validate(); err != nil {
outer := fmt.Errorf("Service[%d] %s validation failed: %s", i, service.Name, err)
mErr.Errors = append(mErr.Errors, outer)
@@ -6679,7 +6708,7 @@ func (tg *TaskGroup) validateServices() error {
if check.AddressMode == AddressModeDriver {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Check %q invalid: cannot use address_mode=\"driver\", only checks defined in a \"task\" service block can use this mode", service.Name))
}
if _, ok := knownTasks[check.TaskName]; !ok {
if !taskSet.Contains(check.TaskName) {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("Check %s invalid: refers to non-existent task %s", check.Name, check.TaskName))
}
@@ -6687,10 +6716,29 @@ func (tg *TaskGroup) validateServices() error {
}
}
// Produce an error of any services which are not unique enough in the group
// i.e. have same <task, name, port>
if idDuplicateSet.Size() > 0 {
mErr.Errors = append(mErr.Errors,
fmt.Errorf(
"Services are not unique: %s",
idDuplicateSet.String(
func(u unique) string {
s := u.task + "->" + u.name
if u.port != "" {
s += ":" + u.port
}
return s
},
),
),
)
}
// The initial feature release of native service discovery only allows for
// a single service provider to be used across all services in a task
// group.
if len(configuredProviders) > 1 {
if providerSet.Size() > 1 {
mErr.Errors = append(mErr.Errors,
errors.New("Multiple service providers used: task group services must use the same provider"))
}