mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
consul: fix constraints for non-default clusters (ENT) (#19449)
The Connect-related constraints injected by the Connect job mutating hook do not account for non-default `consul` blocks (for Nomad Enterprise). This works when both the default and non-default clusters are available and are the same version, but not when they do not. Fixes: https://github.com/hashicorp/nomad/issues/19442
This commit is contained in:
3
.changelog/19449.txt
Normal file
3
.changelog/19449.txt
Normal file
@@ -0,0 +1,3 @@
|
||||
```release-note:bug
|
||||
consul (Enterprise): Fixed a bug where implicit Consul constraints were not specific to non-default Consul clusters
|
||||
```
|
||||
@@ -80,40 +80,56 @@ func connectGatewayDriverConfig(hostNetwork bool) map[string]interface{} {
|
||||
// the proper Consul version is used that supports the necessary Connect
|
||||
// features. This includes bootstrapping envoy with a unix socket for Consul's
|
||||
// gRPC xDS API, and support for generating local service identity tokens.
|
||||
func connectSidecarVersionConstraint() *structs.Constraint {
|
||||
return &structs.Constraint{
|
||||
LTarget: "${attr.consul.version}",
|
||||
RTarget: ">= 1.8.0",
|
||||
Operand: structs.ConstraintSemver,
|
||||
func connectSidecarVersionConstraint(cluster string) *structs.Constraint {
|
||||
if cluster != structs.ConsulDefaultCluster && cluster != "" {
|
||||
return &structs.Constraint{
|
||||
LTarget: fmt.Sprintf("${attr.consul.%s.version}", cluster),
|
||||
RTarget: ">= 1.8.0",
|
||||
Operand: structs.ConstraintSemver,
|
||||
}
|
||||
}
|
||||
return consulServiceDiscoveryConstraint
|
||||
}
|
||||
|
||||
// connectGatewayVersionConstraint is used when building a connect gateway
|
||||
// task to ensure proper Consul version is used that supports Connect Gateway
|
||||
// features. This includes making use of Consul Configuration Entries of type
|
||||
// {ingress,terminating,mesh}-gateway.
|
||||
func connectGatewayVersionConstraint() *structs.Constraint {
|
||||
return &structs.Constraint{
|
||||
LTarget: "${attr.consul.version}",
|
||||
RTarget: ">= 1.8.0",
|
||||
Operand: structs.ConstraintSemver,
|
||||
func connectGatewayVersionConstraint(cluster string) *structs.Constraint {
|
||||
if cluster != structs.ConsulDefaultCluster && cluster != "" {
|
||||
return &structs.Constraint{
|
||||
LTarget: fmt.Sprintf("${attr.consul.%s.version}", cluster),
|
||||
RTarget: ">= 1.8.0",
|
||||
Operand: structs.ConstraintSemver,
|
||||
}
|
||||
}
|
||||
return consulServiceDiscoveryConstraint
|
||||
}
|
||||
|
||||
// connectGatewayTLSVersionConstraint is used when building a connect gateway
|
||||
// task to ensure proper Consul version is used that supports customized TLS version.
|
||||
// https://github.com/hashicorp/consul/pull/11576
|
||||
func connectGatewayTLSVersionConstraint() *structs.Constraint {
|
||||
func connectGatewayTLSVersionConstraint(cluster string) *structs.Constraint {
|
||||
attr := "${attr.consul.version}"
|
||||
if cluster != structs.ConsulDefaultCluster {
|
||||
attr = fmt.Sprintf("${attr.consul.%s.version}", cluster)
|
||||
}
|
||||
|
||||
return &structs.Constraint{
|
||||
LTarget: "${attr.consul.version}",
|
||||
LTarget: attr,
|
||||
RTarget: ">= 1.11.2",
|
||||
Operand: structs.ConstraintSemver,
|
||||
}
|
||||
}
|
||||
|
||||
func connectListenerConstraint() *structs.Constraint {
|
||||
func connectListenerConstraint(cluster string) *structs.Constraint {
|
||||
attr := "${attr.consul.grpc}"
|
||||
if cluster != structs.ConsulDefaultCluster {
|
||||
attr = fmt.Sprintf("${attr.consul.%s.grpc}", cluster)
|
||||
}
|
||||
|
||||
return &structs.Constraint{
|
||||
LTarget: "${attr.consul.grpc}",
|
||||
LTarget: attr,
|
||||
RTarget: "0",
|
||||
Operand: ">",
|
||||
}
|
||||
@@ -277,7 +293,8 @@ func groupConnectHook(job *structs.Job, g *structs.TaskGroup) error {
|
||||
// If the task doesn't already exist, create a new one and add it to the job
|
||||
if task == nil {
|
||||
driver := groupConnectGuessTaskDriver(g)
|
||||
task = newConnectSidecarTask(service.Name, driver)
|
||||
cluster := service.GetConsulClusterName(g)
|
||||
task = newConnectSidecarTask(service.Name, driver, cluster)
|
||||
|
||||
// If there happens to be a task defined with the same name
|
||||
// append an UUID fragment to the task name
|
||||
@@ -357,7 +374,8 @@ func groupConnectHook(job *structs.Job, g *structs.TaskGroup) error {
|
||||
netHost := netMode == "host"
|
||||
customizedTLS := service.Connect.IsCustomizedTLS()
|
||||
|
||||
task := newConnectGatewayTask(prefix, service.Name, netHost, customizedTLS)
|
||||
task := newConnectGatewayTask(prefix, service.Name,
|
||||
service.GetConsulClusterName(g), netHost, customizedTLS)
|
||||
g.Tasks = append(g.Tasks, task)
|
||||
|
||||
// the connect.sidecar_task block can also be used to configure
|
||||
@@ -476,13 +494,13 @@ func gatewayBindAddressesIngressForBridge(ingress *structs.ConsulIngressConfigEn
|
||||
return addresses
|
||||
}
|
||||
|
||||
func newConnectGatewayTask(prefix, service string, netHost, customizedTls bool) *structs.Task {
|
||||
func newConnectGatewayTask(prefix, service, cluster string, netHost, customizedTls bool) *structs.Task {
|
||||
constraints := structs.Constraints{
|
||||
connectGatewayVersionConstraint(),
|
||||
connectListenerConstraint(),
|
||||
connectGatewayVersionConstraint(cluster),
|
||||
connectListenerConstraint(cluster),
|
||||
}
|
||||
if customizedTls {
|
||||
constraints = append(constraints, connectGatewayTLSVersionConstraint())
|
||||
constraints = append(constraints, connectGatewayTLSVersionConstraint(cluster))
|
||||
}
|
||||
return &structs.Task{
|
||||
// Name is used in container name so must start with '[A-Za-z0-9]'
|
||||
@@ -500,7 +518,11 @@ func newConnectGatewayTask(prefix, service string, netHost, customizedTls bool)
|
||||
}
|
||||
}
|
||||
|
||||
func newConnectSidecarTask(service, driver string) *structs.Task {
|
||||
func newConnectSidecarTask(service, driver, cluster string) *structs.Task {
|
||||
|
||||
versionConstraint := connectSidecarVersionConstraint(cluster)
|
||||
listenerConstraint := connectListenerConstraint(cluster)
|
||||
|
||||
return &structs.Task{
|
||||
// Name is used in container name so must start with '[A-Za-z0-9]'
|
||||
Name: fmt.Sprintf("%s-%s", structs.ConnectProxyPrefix, service),
|
||||
@@ -517,10 +539,7 @@ func newConnectSidecarTask(service, driver string) *structs.Task {
|
||||
Hook: structs.TaskLifecycleHookPrestart,
|
||||
Sidecar: true,
|
||||
},
|
||||
Constraints: structs.Constraints{
|
||||
connectSidecarVersionConstraint(),
|
||||
connectListenerConstraint(),
|
||||
},
|
||||
Constraints: structs.Constraints{versionConstraint, listenerConstraint},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -112,11 +112,11 @@ func TestJobEndpointConnect_groupConnectGuessTaskDriver(t *testing.T) {
|
||||
func TestJobEndpointConnect_newConnectSidecarTask(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
task := newConnectSidecarTask("redis", "podman")
|
||||
task := newConnectSidecarTask("redis", "podman", structs.ConsulDefaultCluster)
|
||||
must.Eq(t, "connect-proxy-redis", task.Name)
|
||||
must.Eq(t, "podman", task.Driver)
|
||||
|
||||
task2 := newConnectSidecarTask("db", "docker")
|
||||
task2 := newConnectSidecarTask("db", "docker", structs.ConsulDefaultCluster)
|
||||
must.Eq(t, "connect-proxy-db", task2.Name)
|
||||
must.Eq(t, "docker", task2.Driver)
|
||||
}
|
||||
@@ -154,8 +154,8 @@ func TestJobEndpointConnect_groupConnectHook(t *testing.T) {
|
||||
// Expected tasks
|
||||
tgExp := job.TaskGroups[0].Copy()
|
||||
tgExp.Tasks = []*structs.Task{
|
||||
newConnectSidecarTask("backend", "docker"),
|
||||
newConnectSidecarTask("admin", "docker"),
|
||||
newConnectSidecarTask("backend", "docker", structs.ConsulDefaultCluster),
|
||||
newConnectSidecarTask("admin", "docker", structs.ConsulDefaultCluster),
|
||||
}
|
||||
tgExp.Services[0].Name = "backend"
|
||||
tgExp.Services[1].Name = "admin"
|
||||
@@ -200,7 +200,8 @@ func TestJobEndpointConnect_groupConnectHook_IngressGateway_BridgeNetwork(t *tes
|
||||
expTG := job.TaskGroups[0].Copy()
|
||||
expTG.Tasks = []*structs.Task{
|
||||
// inject the gateway task
|
||||
newConnectGatewayTask(structs.ConnectIngressPrefix, "my-gateway", false, true),
|
||||
newConnectGatewayTask(structs.ConnectIngressPrefix, "my-gateway",
|
||||
structs.ConsulDefaultCluster, false, true),
|
||||
}
|
||||
expTG.Services[0].Name = "my-gateway"
|
||||
expTG.Tasks[0].Canonicalize(job, expTG)
|
||||
@@ -239,7 +240,8 @@ func TestJobEndpointConnect_groupConnectHook_IngressGateway_HostNetwork(t *testi
|
||||
expTG := job.TaskGroups[0].Copy()
|
||||
expTG.Tasks = []*structs.Task{
|
||||
// inject the gateway task
|
||||
newConnectGatewayTask(structs.ConnectIngressPrefix, "my-gateway", true, false),
|
||||
newConnectGatewayTask(structs.ConnectIngressPrefix, "my-gateway",
|
||||
structs.ConsulDefaultCluster, true, false),
|
||||
}
|
||||
expTG.Services[0].Name = "my-gateway"
|
||||
expTG.Tasks[0].Canonicalize(job, expTG)
|
||||
@@ -305,8 +307,8 @@ func TestJobEndpointConnect_groupConnectHook_IngressGateway_CustomTask(t *testin
|
||||
ShutdownDelay: 5 * time.Second,
|
||||
KillSignal: "SIGHUP",
|
||||
Constraints: structs.Constraints{
|
||||
connectGatewayVersionConstraint(),
|
||||
connectListenerConstraint(),
|
||||
connectGatewayVersionConstraint(structs.ConsulDefaultCluster),
|
||||
connectListenerConstraint(structs.ConsulDefaultCluster),
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -341,7 +343,8 @@ func TestJobEndpointConnect_groupConnectHook_TerminatingGateway(t *testing.T) {
|
||||
expTG := job.TaskGroups[0].Copy()
|
||||
expTG.Tasks = []*structs.Task{
|
||||
// inject the gateway task
|
||||
newConnectGatewayTask(structs.ConnectTerminatingPrefix, "my-gateway", false, false),
|
||||
newConnectGatewayTask(structs.ConnectTerminatingPrefix, "my-gateway",
|
||||
structs.ConsulDefaultCluster, false, false),
|
||||
}
|
||||
expTG.Services[0].Name = "my-gateway"
|
||||
expTG.Tasks[0].Canonicalize(job, expTG)
|
||||
@@ -375,7 +378,8 @@ func TestJobEndpointConnect_groupConnectHook_MeshGateway(t *testing.T) {
|
||||
expTG := job.TaskGroups[0].Copy()
|
||||
expTG.Tasks = []*structs.Task{
|
||||
// inject the gateway task
|
||||
newConnectGatewayTask(structs.ConnectMeshPrefix, "my-gateway", false, false),
|
||||
newConnectGatewayTask(structs.ConnectMeshPrefix, "my-gateway",
|
||||
structs.ConsulDefaultCluster, false, false),
|
||||
}
|
||||
expTG.Services[0].Name = "my-gateway"
|
||||
expTG.Services[0].PortLabel = "public_port"
|
||||
@@ -694,20 +698,36 @@ func TestJobEndpointConnect_newConnectGatewayTask_host(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
t.Run("ingress", func(t *testing.T) {
|
||||
task := newConnectGatewayTask(structs.ConnectIngressPrefix, "foo", true, false)
|
||||
require.Equal(t, "connect-ingress-foo", task.Name)
|
||||
require.Equal(t, "connect-ingress:foo", string(task.Kind))
|
||||
require.Equal(t, ">= 1.8.0", task.Constraints[0].RTarget)
|
||||
require.Equal(t, "host", task.Config["network_mode"])
|
||||
task := newConnectGatewayTask(structs.ConnectIngressPrefix, "foo",
|
||||
structs.ConsulDefaultCluster, true, false)
|
||||
must.Eq(t, "connect-ingress-foo", task.Name)
|
||||
must.Eq(t, "connect-ingress:foo", string(task.Kind))
|
||||
must.Eq(t, "${attr.consul.version}", task.Constraints[0].LTarget)
|
||||
must.Eq(t, ">= 1.8.0", task.Constraints[0].RTarget)
|
||||
must.Eq(t, "host", task.Config["network_mode"])
|
||||
require.Nil(t, task.Lifecycle)
|
||||
})
|
||||
|
||||
t.Run("terminating", func(t *testing.T) {
|
||||
task := newConnectGatewayTask(structs.ConnectTerminatingPrefix, "bar", true, false)
|
||||
require.Equal(t, "connect-terminating-bar", task.Name)
|
||||
require.Equal(t, "connect-terminating:bar", string(task.Kind))
|
||||
require.Equal(t, ">= 1.8.0", task.Constraints[0].RTarget)
|
||||
require.Equal(t, "host", task.Config["network_mode"])
|
||||
task := newConnectGatewayTask(structs.ConnectTerminatingPrefix, "bar",
|
||||
structs.ConsulDefaultCluster, true, false)
|
||||
must.Eq(t, "connect-terminating-bar", task.Name)
|
||||
must.Eq(t, "connect-terminating:bar", string(task.Kind))
|
||||
must.Eq(t, "${attr.consul.version}", task.Constraints[0].LTarget)
|
||||
must.Eq(t, ">= 1.8.0", task.Constraints[0].RTarget)
|
||||
must.Eq(t, "host", task.Config["network_mode"])
|
||||
require.Nil(t, task.Lifecycle)
|
||||
})
|
||||
|
||||
// this case can only happen on ENT but gets run in CE code
|
||||
t.Run("terminating nondefault (ENT)", func(t *testing.T) {
|
||||
task := newConnectGatewayTask(structs.ConnectTerminatingPrefix, "bar",
|
||||
"nondefault", true, false)
|
||||
must.Eq(t, "connect-terminating-bar", task.Name)
|
||||
must.Eq(t, "connect-terminating:bar", string(task.Kind))
|
||||
must.Eq(t, "${attr.consul.nondefault.version}", task.Constraints[0].LTarget)
|
||||
must.Eq(t, ">= 1.8.0", task.Constraints[0].RTarget)
|
||||
must.Eq(t, "host", task.Config["network_mode"])
|
||||
require.Nil(t, task.Lifecycle)
|
||||
})
|
||||
}
|
||||
@@ -715,7 +735,8 @@ func TestJobEndpointConnect_newConnectGatewayTask_host(t *testing.T) {
|
||||
func TestJobEndpointConnect_newConnectGatewayTask_bridge(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
task := newConnectGatewayTask(structs.ConnectIngressPrefix, "service1", false, false)
|
||||
task := newConnectGatewayTask(structs.ConnectIngressPrefix, "service1",
|
||||
structs.ConsulDefaultCluster, false, false)
|
||||
require.NotContains(t, task.Config, "network_mode")
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user