mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 18:35:44 +03:00
Merge pull request #7192 from hashicorp/b-connect-stanza-ignore
consul/connect: in-place update sidecar service registrations on changes
This commit is contained in:
@@ -109,7 +109,17 @@ type ACLsAPI interface {
|
||||
// agentServiceUpdateRequired checks if any critical fields in Nomad's version
|
||||
// of a service definition are different from the existing service definition as
|
||||
// known by Consul.
|
||||
func agentServiceUpdateRequired(reason syncReason, wanted *api.AgentServiceRegistration, existing *api.AgentService) bool {
|
||||
//
|
||||
// reason - The syncReason that triggered this synchronization with the consul
|
||||
// agent API.
|
||||
// wanted - Nomad's view of what the service definition is intended to be.
|
||||
// Not nil.
|
||||
// existing - Consul's view (agent, not catalog) of the actual service definition.
|
||||
// Not nil.
|
||||
// sidecar - Consul's view (agent, not catalog) of the service definition of the sidecar
|
||||
// associated with existing that may or may not exist.
|
||||
// May be nil.
|
||||
func agentServiceUpdateRequired(reason syncReason, wanted *api.AgentServiceRegistration, existing *api.AgentService, sidecar *api.AgentService) bool {
|
||||
switch reason {
|
||||
case syncPeriodic:
|
||||
// In a periodic sync with Consul, we need to respect the value of
|
||||
@@ -123,31 +133,39 @@ func agentServiceUpdateRequired(reason syncReason, wanted *api.AgentServiceRegis
|
||||
//
|
||||
// We do so by over-writing the nomad service registration by the value
|
||||
// of the tags that Consul contains, if enable_tag_override = true.
|
||||
maybeTweakTags(wanted, existing)
|
||||
return different(wanted, existing)
|
||||
maybeTweakTags(wanted, existing, sidecar)
|
||||
return different(wanted, existing, sidecar)
|
||||
|
||||
default:
|
||||
// A non-periodic sync with Consul indicates an operation has been set
|
||||
// on the queue. This happens when service has been added / removed / modified
|
||||
// and implies the Consul agent should be sync'd with nomad, because
|
||||
// nomad is the ultimate source of truth for the service definition.
|
||||
return different(wanted, existing)
|
||||
return different(wanted, existing, sidecar)
|
||||
}
|
||||
}
|
||||
|
||||
// maybeTweakTags will override wanted.Tags with a copy of existing.Tags only if
|
||||
// EnableTagOverride is true. Otherwise the wanted service registration is left
|
||||
// unchanged.
|
||||
func maybeTweakTags(wanted *api.AgentServiceRegistration, existing *api.AgentService) {
|
||||
func maybeTweakTags(wanted *api.AgentServiceRegistration, existing *api.AgentService, sidecar *api.AgentService) {
|
||||
if wanted.EnableTagOverride {
|
||||
wanted.Tags = helper.CopySliceString(existing.Tags)
|
||||
// If the service registration also defines a sidecar service, use the ETO
|
||||
// setting for the parent service to also apply to the sidecar.
|
||||
if wanted.Connect != nil && wanted.Connect.SidecarService != nil {
|
||||
if sidecar != nil {
|
||||
wanted.Connect.SidecarService.Tags = helper.CopySliceString(sidecar.Tags)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// different compares the wanted state of the service registration with the actual
|
||||
// (cached) state of the service registration reported by Consul. If any of the
|
||||
// critical fields are not deeply equal, they considered different.
|
||||
func different(wanted *api.AgentServiceRegistration, existing *api.AgentService) bool {
|
||||
func different(wanted *api.AgentServiceRegistration, existing *api.AgentService, sidecar *api.AgentService) bool {
|
||||
|
||||
return !(wanted.Kind == existing.Kind &&
|
||||
wanted.ID == existing.ID &&
|
||||
wanted.Port == existing.Port &&
|
||||
@@ -155,7 +173,25 @@ func different(wanted *api.AgentServiceRegistration, existing *api.AgentService)
|
||||
wanted.Name == existing.Service &&
|
||||
wanted.EnableTagOverride == existing.EnableTagOverride &&
|
||||
reflect.DeepEqual(wanted.Meta, existing.Meta) &&
|
||||
reflect.DeepEqual(wanted.Tags, existing.Tags))
|
||||
reflect.DeepEqual(wanted.Tags, existing.Tags) &&
|
||||
!connectSidecarDifferent(wanted, sidecar))
|
||||
}
|
||||
|
||||
func connectSidecarDifferent(wanted *api.AgentServiceRegistration, sidecar *api.AgentService) bool {
|
||||
if wanted.Connect != nil && wanted.Connect.SidecarService != nil {
|
||||
if sidecar == nil {
|
||||
// consul lost our sidecar (?)
|
||||
return true
|
||||
}
|
||||
if !reflect.DeepEqual(wanted.Connect.SidecarService.Tags, sidecar.Tags) {
|
||||
// tags on the nomad definition have been modified
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// There is no connect sidecar the nomad side; let consul anti-entropy worry
|
||||
// about any registration on the consul side.
|
||||
return false
|
||||
}
|
||||
|
||||
// operations are submitted to the main loop via commit() for synchronizing
|
||||
@@ -365,7 +401,8 @@ func (c *ServiceClient) hasSeen() bool {
|
||||
// syncReason indicates why a sync operation with consul is about to happen.
|
||||
//
|
||||
// The trigger for a sync may have implications on the behavior of the sync itself.
|
||||
// In particular, if a service is defined with enable_tag_override=true
|
||||
// In particular if a service is defined with enable_tag_override=true, the sync
|
||||
// should ignore changes to the service's Tags field.
|
||||
type syncReason byte
|
||||
|
||||
const (
|
||||
@@ -579,25 +616,20 @@ func (c *ServiceClient) sync(reason syncReason) error {
|
||||
}
|
||||
|
||||
// Add Nomad services missing from Consul, or where the service has been updated.
|
||||
for id, local := range c.services {
|
||||
existingSvc, ok := consulServices[id]
|
||||
for id, serviceInNomad := range c.services {
|
||||
|
||||
if ok {
|
||||
// There is an existing registration of this service in Consul, so here
|
||||
// we validate to see if the service has been invalidated to see if it
|
||||
// should be updated.
|
||||
if !agentServiceUpdateRequired(reason, local, existingSvc) {
|
||||
// No Need to update services that have not changed
|
||||
continue
|
||||
serviceInConsul, exists := consulServices[id]
|
||||
sidecarInConsul := getNomadSidecar(id, consulServices)
|
||||
|
||||
if !exists || agentServiceUpdateRequired(reason, serviceInNomad, serviceInConsul, sidecarInConsul) {
|
||||
if err = c.client.ServiceRegister(serviceInNomad); err != nil {
|
||||
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
|
||||
return err
|
||||
}
|
||||
sreg++
|
||||
metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1)
|
||||
}
|
||||
|
||||
if err = c.client.ServiceRegister(local); err != nil {
|
||||
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
|
||||
return err
|
||||
}
|
||||
sreg++
|
||||
metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1)
|
||||
}
|
||||
|
||||
// Remove Nomad checks in Consul but unknown locally
|
||||
@@ -1318,6 +1350,10 @@ func isOldNomadService(id string) bool {
|
||||
return strings.HasPrefix(id, prefix)
|
||||
}
|
||||
|
||||
const (
|
||||
sidecarSuffix = "-sidecar-proxy"
|
||||
)
|
||||
|
||||
// isNomadSidecar returns true if the ID matches a sidecar proxy for a Nomad
|
||||
// managed service.
|
||||
//
|
||||
@@ -1330,16 +1366,29 @@ func isOldNomadService(id string) bool {
|
||||
// _nomad-task-5229c7f8-376b-3ccc-edd9-981e238f7033-cache-redis-cache-db-sidecar-proxy
|
||||
//
|
||||
func isNomadSidecar(id string, services map[string]*api.AgentServiceRegistration) bool {
|
||||
const suffix = "-sidecar-proxy"
|
||||
if !strings.HasSuffix(id, suffix) {
|
||||
if !strings.HasSuffix(id, sidecarSuffix) {
|
||||
return false
|
||||
}
|
||||
|
||||
// Make sure the Nomad managed service for this proxy still exists.
|
||||
_, ok := services[id[:len(id)-len(suffix)]]
|
||||
_, ok := services[id[:len(id)-len(sidecarSuffix)]]
|
||||
return ok
|
||||
}
|
||||
|
||||
// getNomadSidecar returns the service registration of the sidecar for the managed
|
||||
// service with the specified id.
|
||||
//
|
||||
// If the managed service of the specified id does not exist, or the service does
|
||||
// not have a sidecar proxy, nil is returned.
|
||||
func getNomadSidecar(id string, services map[string]*api.AgentService) *api.AgentService {
|
||||
if _, exists := services[id]; !exists {
|
||||
return nil
|
||||
}
|
||||
|
||||
sidecarID := id + sidecarSuffix
|
||||
return services[sidecarID]
|
||||
}
|
||||
|
||||
// getAddress returns the IP and port to use for a service or check. If no port
|
||||
// label is specified (an empty value), zero values are returned because no
|
||||
// address could be resolved.
|
||||
|
||||
@@ -1,37 +1,58 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
// the service as known by nomad
|
||||
wanted = api.AgentServiceRegistration{
|
||||
Kind: "",
|
||||
ID: "aca4c175-1778-5ef4-0220-2ab434147d35",
|
||||
Name: "myservice",
|
||||
Tags: []string{"a", "b"},
|
||||
Port: 9000,
|
||||
Address: "1.1.1.1",
|
||||
EnableTagOverride: true,
|
||||
Meta: map[string]string{"foo": "1"},
|
||||
Connect: &api.AgentServiceConnect{
|
||||
Native: false,
|
||||
SidecarService: &api.AgentServiceRegistration{
|
||||
Kind: "connect-proxy",
|
||||
ID: "_nomad-task-8e8413af-b5bb-aa67-2c24-c146c45f1ec9-group-mygroup-myservice-9001-sidecar-proxy",
|
||||
Name: "name-sidecar-proxy",
|
||||
Tags: []string{"x", "y", "z"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// the service (and + connect proxy) as known by consul
|
||||
existing = &api.AgentService{
|
||||
Kind: "",
|
||||
ID: "aca4c175-1778-5ef4-0220-2ab434147d35",
|
||||
Service: "myservice",
|
||||
Tags: []string{"a", "b"},
|
||||
Port: 9000,
|
||||
Address: "1.1.1.1",
|
||||
EnableTagOverride: true,
|
||||
Meta: map[string]string{"foo": "1"},
|
||||
}
|
||||
|
||||
sidecar = &api.AgentService{
|
||||
Kind: "connect-proxy",
|
||||
ID: "_nomad-task-8e8413af-b5bb-aa67-2c24-c146c45f1ec9-group-mygroup-myservice-9001-sidecar-proxy",
|
||||
Service: "myservice-sidecar-proxy",
|
||||
Tags: []string{"x", "y", "z"},
|
||||
}
|
||||
)
|
||||
|
||||
func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
wanted := api.AgentServiceRegistration{
|
||||
Kind: "service",
|
||||
ID: "_id",
|
||||
Name: "name",
|
||||
Tags: []string{"a", "b"},
|
||||
Port: 9000,
|
||||
Address: "1.1.1.1",
|
||||
EnableTagOverride: true,
|
||||
Meta: map[string]string{"foo": "1"},
|
||||
}
|
||||
|
||||
existing := &api.AgentService{
|
||||
Kind: "service",
|
||||
ID: "_id",
|
||||
Service: "name",
|
||||
Tags: []string{"a", "b"},
|
||||
Port: 9000,
|
||||
Address: "1.1.1.1",
|
||||
EnableTagOverride: true,
|
||||
Meta: map[string]string{"foo": "1"},
|
||||
}
|
||||
|
||||
// By default wanted and existing match. Each test should modify wanted in
|
||||
// 1 way, and / or configure the type of sync operation that is being
|
||||
// considered, then evaluate the result of the update-required algebra.
|
||||
@@ -44,7 +65,7 @@ func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) {
|
||||
exp bool,
|
||||
reason syncReason,
|
||||
tweak tweaker) {
|
||||
result := agentServiceUpdateRequired(reason, tweak(wanted), existing)
|
||||
result := agentServiceUpdateRequired(reason, tweak(wanted), existing, sidecar)
|
||||
require.Equal(t, exp, result)
|
||||
}
|
||||
|
||||
@@ -103,7 +124,7 @@ func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) {
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("different tags syncNewOps eto->true", func(t *testing.T) {
|
||||
t.Run("different tags syncNewOps eto=true", func(t *testing.T) {
|
||||
// sync is required even though eto=true, because NewOps indicates the
|
||||
// service definition in nomad has changed (e.g. job run a modified job)
|
||||
try(t, true, syncNewOps, func(w asr) *asr {
|
||||
@@ -112,7 +133,7 @@ func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) {
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("different tags syncPeriodic eto->true", func(t *testing.T) {
|
||||
t.Run("different tags syncPeriodic eto=true", func(t *testing.T) {
|
||||
// sync is not required since eto=true and this is a periodic sync
|
||||
// with consul - in which case we keep Consul's definition of the tags
|
||||
try(t, false, syncPeriodic, func(w asr) *asr {
|
||||
@@ -121,11 +142,29 @@ func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) {
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("different sidecar tags on syncPeriodic eto=true", func(t *testing.T) {
|
||||
try(t, false, syncPeriodic, func(w asr) *asr {
|
||||
// like the parent service, the sidecar's tags do not get enforced
|
||||
// if ETO is true and this is a periodic sync
|
||||
w.Connect.SidecarService.Tags = []string{"other", "tags"}
|
||||
return &w
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("different sidecar tags on syncNewOps eto=true", func(t *testing.T) {
|
||||
try(t, true, syncNewOps, func(w asr) *asr {
|
||||
// like the parent service, the sidecar's tags always get enforced
|
||||
// regardless of ETO if this is a sync due to applied operations
|
||||
w.Connect.SidecarService.Tags = []string{"other", "tags"}
|
||||
return &w
|
||||
})
|
||||
})
|
||||
|
||||
// for remaining tests, EnableTagOverride = false
|
||||
wanted.EnableTagOverride = false
|
||||
existing.EnableTagOverride = false
|
||||
|
||||
t.Run("different tags : syncPeriodic : eto->false", func(t *testing.T) {
|
||||
t.Run("different tags syncPeriodic eto=false", func(t *testing.T) {
|
||||
// sync is required because eto=false and the tags do not match
|
||||
try(t, true, syncPeriodic, func(w asr) *asr {
|
||||
w.Tags = []string{"other", "tags"}
|
||||
@@ -133,28 +172,100 @@ func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) {
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("different tags : syncNewOps : eto->false", func(t *testing.T) {
|
||||
// sync is required because it was triggered by NewOps and the tags
|
||||
// do not match
|
||||
t.Run("different tags syncNewOps eto=false", func(t *testing.T) {
|
||||
// sync is required because eto=false and the tags do not match
|
||||
try(t, true, syncNewOps, func(w asr) *asr {
|
||||
w.Tags = []string{"other", "tags"}
|
||||
return &w
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("different sidecar tags on syncPeriodic eto=false", func(t *testing.T) {
|
||||
// like the parent service, sync is required because eto=false and the
|
||||
// sidecar's tags do not match
|
||||
try(t, true, syncPeriodic, func(w asr) *asr {
|
||||
w.Connect.SidecarService.Tags = []string{"other", "tags"}
|
||||
return &w
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("different sidecar tags syncNewOps eto=false", func(t *testing.T) {
|
||||
// like the parent service, sync is required because eto=false and the
|
||||
// sidecar's tags do not match
|
||||
try(t, true, syncNewOps, func(w asr) *asr {
|
||||
w.Connect.SidecarService.Tags = []string{"other", "tags"}
|
||||
return &w
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestSyncLogic_maybeTweakTags(t *testing.T) {
|
||||
t.Parallel()
|
||||
r := require.New(t)
|
||||
|
||||
wanted := &api.AgentServiceRegistration{Tags: []string{"original"}}
|
||||
existing := &api.AgentService{Tags: []string{"other"}}
|
||||
maybeTweakTags(wanted, existing)
|
||||
r.Equal([]string{"original"}, wanted.Tags)
|
||||
differentPointers := func(a, b []string) bool {
|
||||
return &(a) != &(b)
|
||||
}
|
||||
|
||||
wantedETO := &api.AgentServiceRegistration{Tags: []string{"original"}, EnableTagOverride: true}
|
||||
existingETO := &api.AgentService{Tags: []string{"other"}, EnableTagOverride: true}
|
||||
maybeTweakTags(wantedETO, existingETO)
|
||||
r.Equal(existingETO.Tags, wantedETO.Tags)
|
||||
r.False(&(existingETO.Tags) == &(wantedETO.Tags))
|
||||
try := func(inConsul, inConsulSC []string, eto bool) {
|
||||
wanted := &api.AgentServiceRegistration{
|
||||
Tags: []string{"original"},
|
||||
Connect: &api.AgentServiceConnect{
|
||||
SidecarService: &api.AgentServiceRegistration{
|
||||
Tags: []string{"original-sidecar"},
|
||||
},
|
||||
},
|
||||
EnableTagOverride: eto,
|
||||
}
|
||||
|
||||
existing := &api.AgentService{Tags: inConsul}
|
||||
sidecar := &api.AgentService{Tags: inConsulSC}
|
||||
|
||||
maybeTweakTags(wanted, existing, sidecar)
|
||||
|
||||
switch eto {
|
||||
case false:
|
||||
require.Equal(t, []string{"original"}, wanted.Tags)
|
||||
require.Equal(t, []string{"original-sidecar"}, wanted.Connect.SidecarService.Tags)
|
||||
require.True(t, differentPointers(wanted.Tags, wanted.Connect.SidecarService.Tags))
|
||||
case true:
|
||||
require.Equal(t, inConsul, wanted.Tags)
|
||||
require.Equal(t, inConsulSC, wanted.Connect.SidecarService.Tags)
|
||||
require.True(t, differentPointers(wanted.Tags, wanted.Connect.SidecarService.Tags))
|
||||
}
|
||||
}
|
||||
|
||||
try([]string{"original"}, []string{"original-sidecar"}, true)
|
||||
try([]string{"original"}, []string{"original-sidecar"}, false)
|
||||
try([]string{"modified"}, []string{"original-sidecar"}, true)
|
||||
try([]string{"modified"}, []string{"original-sidecar"}, false)
|
||||
try([]string{"original"}, []string{"modified-sidecar"}, true)
|
||||
try([]string{"original"}, []string{"modified-sidecar"}, false)
|
||||
try([]string{"modified"}, []string{"modified-sidecar"}, true)
|
||||
try([]string{"modified"}, []string{"modified-sidecar"}, false)
|
||||
}
|
||||
|
||||
func TestSyncLogic_maybeTweakTags_emptySC(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Check the edge cases where the connect service is deleted on the nomad
|
||||
// side (i.e. are we checking multiple nil pointers).
|
||||
|
||||
try := func(asr *api.AgentServiceRegistration) {
|
||||
maybeTweakTags(asr, existing, sidecar)
|
||||
require.False(t, reflect.DeepEqual([]string{"original"}, asr.Tags))
|
||||
}
|
||||
|
||||
try(&api.AgentServiceRegistration{
|
||||
Tags: []string{"original"},
|
||||
EnableTagOverride: true,
|
||||
Connect: nil, // ooh danger!
|
||||
})
|
||||
|
||||
try(&api.AgentServiceRegistration{
|
||||
Tags: []string{"original"},
|
||||
EnableTagOverride: true,
|
||||
Connect: &api.AgentServiceConnect{
|
||||
SidecarService: nil, // ooh danger!
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user