e2e: consul namespace tests from nomad ent

(cherry-picked from ent without _ent things)

This is part 2/4 of e2e tests for Consul Namespaces. Took a
first pass at what the parameterized tests can look like, but
only on the ENT side for this PR. Will continue to refactor
in the next PRs.

Also fixes 2 bugs:
 - Config Entries registered by Nomad Server on job registration
   were not getting Namespace set
 - Group level script checks were not getting Namespace set

Those changes will need to be copied back to Nomad OSS.

Nomad OSS + no ACLs (previously, needs refactor)
Nomad ENT + no ACLs (this)
Nomad OSS + ACLs (todo)
Nomad ENT + ALCs (todo)
This commit is contained in:
Seth Hoenig
2021-04-19 11:29:36 -06:00
parent fc8ffe9135
commit 5173a12d81
10 changed files with 530 additions and 86 deletions

View File

@@ -185,16 +185,16 @@ func (h *scriptCheckHook) newScriptChecks() map[string]*scriptCheck {
serviceID := agentconsul.MakeAllocServiceID(
h.alloc.ID, h.task.Name, service)
sc := newScriptCheck(&scriptCheckConfig{
namespace: h.consulNamespace,
allocID: h.alloc.ID,
taskName: h.task.Name,
check: check,
serviceID: serviceID,
ttlUpdater: h.consul,
driverExec: h.driverExec,
taskEnv: h.taskEnv,
logger: h.logger,
shutdownCh: h.shutdownCh,
consulNamespace: h.consulNamespace,
allocID: h.alloc.ID,
taskName: h.task.Name,
check: check,
serviceID: serviceID,
ttlUpdater: h.consul,
driverExec: h.driverExec,
taskEnv: h.taskEnv,
logger: h.logger,
shutdownCh: h.shutdownCh,
})
if sc != nil {
scriptChecks[sc.id] = sc
@@ -225,16 +225,17 @@ func (h *scriptCheckHook) newScriptChecks() map[string]*scriptCheck {
serviceID := agentconsul.MakeAllocServiceID(
h.alloc.ID, groupTaskName, service)
sc := newScriptCheck(&scriptCheckConfig{
allocID: h.alloc.ID,
taskName: groupTaskName,
check: check,
serviceID: serviceID,
ttlUpdater: h.consul,
driverExec: h.driverExec,
taskEnv: h.taskEnv,
logger: h.logger,
shutdownCh: h.shutdownCh,
isGroup: true,
consulNamespace: h.consulNamespace,
allocID: h.alloc.ID,
taskName: groupTaskName,
check: check,
serviceID: serviceID,
ttlUpdater: h.consul,
driverExec: h.driverExec,
taskEnv: h.taskEnv,
logger: h.logger,
shutdownCh: h.shutdownCh,
isGroup: true,
})
if sc != nil {
scriptChecks[sc.id] = sc
@@ -277,17 +278,17 @@ type scriptCheck struct {
// scriptCheckConfig is a parameter struct for newScriptCheck
type scriptCheckConfig struct {
allocID string
taskName string
serviceID string
namespace string // consul namespace (TODO: SET)
check *structs.ServiceCheck
ttlUpdater TTLUpdater
driverExec tinterfaces.ScriptExecutor
taskEnv *taskenv.TaskEnv
logger log.Logger
shutdownCh chan struct{}
isGroup bool
allocID string
taskName string
serviceID string
consulNamespace string
check *structs.ServiceCheck
ttlUpdater TTLUpdater
driverExec tinterfaces.ScriptExecutor
taskEnv *taskenv.TaskEnv
logger log.Logger
shutdownCh chan struct{}
isGroup bool
}
// newScriptCheck constructs a scriptCheck. we're only going to
@@ -330,7 +331,7 @@ func newScriptCheck(config *scriptCheckConfig) *scriptCheck {
} else {
sc.id = agentconsul.MakeCheckID(config.serviceID, sc.check)
}
sc.consulNamespace = config.namespace
sc.consulNamespace = config.consulNamespace
return sc
}

View File

@@ -1303,7 +1303,8 @@ func (c *ServiceClient) AllocRegistrations(allocID string) (*AllocRegistration,
// UpdateTTL is used to update the TTL of a check. Typically this will only be
// called to heartbeat script checks.
func (c *ServiceClient) UpdateTTL(id, namespace, output, status string) error {
return c.agentAPI.UpdateTTLOpts(id, output, status, &api.QueryOptions{Namespace: normalizeNamespace(namespace)})
ns := normalizeNamespace(namespace)
return c.agentAPI.UpdateTTLOpts(id, output, status, &api.QueryOptions{Namespace: ns})
}
// Shutdown the Consul client. Update running task registrations and deregister

View File

@@ -34,7 +34,7 @@ job "connect_terminating" {
}
env {
PORT = "9001"
PORT = "${NOMAD_PORT_port}"
}
}
}
@@ -64,7 +64,7 @@ job "connect_terminating" {
}
env {
PORT = "9011"
PORT = "${NOMAD_PORT_port}"
}
}
}

View File

@@ -14,10 +14,9 @@ job "template_kv" {
}
task "task-b" {
driver = "docker"
driver = "raw_exec"
config {
image = "busybox:1"
command = "cat"
args = ["local/a.txt"]
}
@@ -34,10 +33,9 @@ job "template_kv" {
# no consul namespace set
task "task-z" {
driver = "docker"
driver = "raw_exec"
config {
image = "busybox:1"
command = "cat"
args = ["local/a.txt"]
}

View File

@@ -2,7 +2,9 @@ package consul
import (
"fmt"
"sort"
capi "github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/e2e/framework"
"github.com/hashicorp/nomad/helper"
@@ -65,3 +67,337 @@ func (tc *ConsulNamespacesE2ETest) TestNamespacesExist(f *framework.F) {
namespaces := e2eutil.ListConsulNamespaces(f.T(), tc.Consul())
require.True(f.T(), helper.CompareSliceSetString(namespaces, append(consulNamespaces, "default")))
}
func (tc *ConsulNamespacesE2ETest) testConsulRegisterGroupServices(f *framework.F, nsA, nsB, nsC, nsZ string) {
nomadClient := tc.Nomad()
jobID := "cns-group-services"
tc.jobIDs = append(tc.jobIDs, jobID)
// Run job and wait for allocs
allocations := e2eutil.RegisterAndWaitForAllocs(f.T(), nomadClient, cnsJobGroupServices, jobID, "")
require.Len(f.T(), allocations, 3)
allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocations)
e2eutil.WaitForAllocsRunning(f.T(), tc.Nomad(), allocIDs)
r := f.Assertions
c := tc.Consul()
// Verify services with namespace set are registered into expected namespaces
e2eutil.RequireConsulRegistered(r, c, nsB, "b1", 1)
e2eutil.RequireConsulRegistered(r, c, nsB, "b2", 1)
e2eutil.RequireConsulRegistered(r, c, nsC, "c1", 1)
e2eutil.RequireConsulRegistered(r, c, nsC, "c2", 1)
// Verify services without namespace set are registered into default
e2eutil.RequireConsulRegistered(r, c, nsZ, "z1", 1)
e2eutil.RequireConsulRegistered(r, c, nsZ, "z2", 1)
// Verify our services are all healthy
e2eutil.RequireConsulStatus(r, c, nsB, "b1", "passing")
e2eutil.RequireConsulStatus(r, c, nsB, "b2", "passing")
e2eutil.RequireConsulStatus(r, c, nsC, "c1", "passing")
e2eutil.RequireConsulStatus(r, c, nsC, "c2", "passing")
e2eutil.RequireConsulStatus(r, c, nsZ, "z1", "passing")
e2eutil.RequireConsulStatus(r, c, nsZ, "z2", "passing")
// Stop the job
e2eutil.WaitForJobStopped(f.T(), nomadClient, jobID)
// Verify that services were de-registered from Consul
e2eutil.RequireConsulDeregistered(r, c, nsB, "b1")
e2eutil.RequireConsulDeregistered(r, c, nsB, "b2")
e2eutil.RequireConsulDeregistered(r, c, nsC, "c1")
e2eutil.RequireConsulDeregistered(r, c, nsC, "c2")
e2eutil.RequireConsulDeregistered(r, c, nsZ, "z1")
e2eutil.RequireConsulDeregistered(r, c, nsZ, "z2")
}
func (tc *ConsulNamespacesE2ETest) testConsulRegisterTaskServices(f *framework.F, nsA, nsB, nsC, nsZ string) {
nomadClient := tc.Nomad()
jobID := "cns-task-services"
tc.jobIDs = append(tc.jobIDs, jobID)
// Run job and wait for allocs
allocations := e2eutil.RegisterAndWaitForAllocs(f.T(), nomadClient, cnsJobTaskServices, jobID, "")
require.Len(f.T(), allocations, 3)
allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocations)
e2eutil.WaitForAllocsRunning(f.T(), tc.Nomad(), allocIDs)
r := f.Assertions
c := tc.Consul()
// Verify our services were registered into expected namespaces
e2eutil.RequireConsulRegistered(r, c, nsB, "b1", 1)
e2eutil.RequireConsulRegistered(r, c, nsB, "b2", 1)
e2eutil.RequireConsulRegistered(r, c, nsC, "c1", 1)
e2eutil.RequireConsulRegistered(r, c, nsC, "c2", 1)
e2eutil.RequireConsulRegistered(r, c, nsZ, "z1", 1)
e2eutil.RequireConsulRegistered(r, c, nsZ, "z2", 1)
// Verify our services are all healthy
e2eutil.RequireConsulStatus(r, c, nsB, "b1", "passing")
e2eutil.RequireConsulStatus(r, c, nsB, "b2", "passing")
e2eutil.RequireConsulStatus(r, c, nsC, "c1", "passing")
e2eutil.RequireConsulStatus(r, c, nsC, "c2", "passing")
e2eutil.RequireConsulStatus(r, c, nsZ, "z1", "passing")
e2eutil.RequireConsulStatus(r, c, nsZ, "z2", "passing")
// Stop the job
e2eutil.WaitForJobStopped(f.T(), nomadClient, jobID)
// Verify that services were de-registered from Consul
e2eutil.RequireConsulDeregistered(r, c, nsB, "b1")
e2eutil.RequireConsulDeregistered(r, c, nsB, "b2")
e2eutil.RequireConsulDeregistered(r, c, nsC, "c1")
e2eutil.RequireConsulDeregistered(r, c, nsC, "c2")
e2eutil.RequireConsulDeregistered(r, c, nsZ, "z1")
e2eutil.RequireConsulDeregistered(r, c, nsZ, "z2")
}
func (tc *ConsulNamespacesE2ETest) testConsulTemplateKV(f *framework.F, expB, expZ string) {
t := f.T()
nomadClient := tc.Nomad()
jobID := "cns-template-kv"
tc.jobIDs = append(tc.jobIDs, jobID)
// Run job and wait for allocs to complete
allocations := e2eutil.RegisterAndWaitForAllocs(t, nomadClient, cnsJobTemplateKV, jobID, "")
require.Len(t, allocations, 2)
allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocations)
e2eutil.WaitForAllocsStopped(f.T(), tc.Nomad(), allocIDs)
// Sort allocs by name
sort.Sort(e2eutil.AllocsByName(allocations))
// Check template read from expected namespace when namespace set
textB, err := e2eutil.AllocTaskLogs(allocations[0].ID, "task-b", e2eutil.LogsStdOut)
require.NoError(t, err)
require.Equal(t, expB, textB)
// Check template read from default namespace if no namespace set
textZ, err := e2eutil.AllocTaskLogs(allocations[1].ID, "task-z", e2eutil.LogsStdOut)
require.NoError(t, err)
require.Equal(t, expZ, textZ)
// Stop the job
e2eutil.WaitForJobStopped(t, nomadClient, jobID)
}
func (tc *ConsulNamespacesE2ETest) testConsulConnectSidecars(f *framework.F, nsA, nsZ string) {
nomadClient := tc.Nomad()
jobID := "cns-connect-sidecars"
tc.jobIDs = append(tc.jobIDs, jobID)
// Run job and wait for allocs
allocations := e2eutil.RegisterAndWaitForAllocs(f.T(), nomadClient, cnsJobConnectSidecars, jobID, "")
require.Len(f.T(), allocations, 4)
allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocations)
e2eutil.WaitForAllocsRunning(f.T(), tc.Nomad(), allocIDs)
r := f.Assertions
c := tc.Consul()
// Verify services with cns set were registered into expected namespace
e2eutil.RequireConsulRegistered(r, c, nsA, "count-api", 1)
e2eutil.RequireConsulRegistered(r, c, nsA, "count-api-sidecar-proxy", 1)
e2eutil.RequireConsulRegistered(r, c, nsA, "count-dashboard", 1)
e2eutil.RequireConsulRegistered(r, c, nsA, "count-dashboard-sidecar-proxy", 1)
// Verify services without cns set were registered into default
e2eutil.RequireConsulRegistered(r, c, nsZ, "count-api-z", 1)
e2eutil.RequireConsulRegistered(r, c, nsZ, "count-api-z-sidecar-proxy", 1)
e2eutil.RequireConsulRegistered(r, c, nsZ, "count-dashboard-z", 1)
e2eutil.RequireConsulRegistered(r, c, nsZ, "count-dashboard-z-sidecar-proxy", 1)
// Stop the job
e2eutil.WaitForJobStopped(f.T(), nomadClient, jobID)
// Verify that services were de-registered from Consul
e2eutil.RequireConsulDeregistered(r, c, nsA, "count-api")
e2eutil.RequireConsulDeregistered(r, c, nsA, "count-api-sidecar-proxy")
e2eutil.RequireConsulDeregistered(r, c, nsA, "count-dashboard")
e2eutil.RequireConsulDeregistered(r, c, nsA, "count-dashboard-sidecar-proxy")
e2eutil.RequireConsulDeregistered(r, c, nsZ, "count-api-z")
e2eutil.RequireConsulDeregistered(r, c, nsZ, "count-api-z-sidecar-proxy")
e2eutil.RequireConsulDeregistered(r, c, nsZ, "count-dashboard-z")
e2eutil.RequireConsulDeregistered(r, c, nsZ, "count-dashboard-z-sidecar-proxy")
}
func (tc *ConsulNamespacesE2ETest) testConsulConnectIngressGateway(f *framework.F, nsA, nsZ string) {
nomadClient := tc.Nomad()
jobID := "cns-connect-ingress"
tc.jobIDs = append(tc.jobIDs, jobID)
// Run job and wait for allocs
allocations := e2eutil.RegisterAndWaitForAllocs(f.T(), nomadClient, cnsJobConnectIngress, jobID, "")
require.Len(f.T(), allocations, 4) // 2 x (1 service + 1 gateway)
allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocations)
e2eutil.WaitForAllocsRunning(f.T(), tc.Nomad(), allocIDs)
r := f.Assertions
c := tc.Consul()
// Verify services with cns set were registered into expected namespace
e2eutil.RequireConsulRegistered(r, c, nsA, "my-ingress-service", 1)
e2eutil.RequireConsulRegistered(r, c, nsA, "uuid-api", 1)
// Verify services without cns set were registered into default
e2eutil.RequireConsulRegistered(r, c, nsZ, "my-ingress-service-z", 1)
e2eutil.RequireConsulRegistered(r, c, nsZ, "uuid-api-z", 1)
// Read the config entry of gateway with cns set, checking it exists in expected namespace
ce := e2eutil.ReadConsulConfigEntry(f.T(), c, nsA, "ingress-gateway", "my-ingress-service")
require.Equal(f.T(), nsA, ce.GetNamespace())
// Read the config entry of gateway without cns set, checking it exists in default namespace
ceZ := e2eutil.ReadConsulConfigEntry(f.T(), c, nsZ, "ingress-gateway", "my-ingress-service-z")
require.Equal(f.T(), nsZ, ceZ.GetNamespace())
// Stop the job
e2eutil.WaitForJobStopped(f.T(), nomadClient, jobID)
// Remove the config entries
e2eutil.DeleteConsulConfigEntry(f.T(), c, nsA, "ingress-gateway", "my-ingress-service")
e2eutil.DeleteConsulConfigEntry(f.T(), c, nsZ, "ingress-gateway", "my-ingress-service-z")
}
func (tc *ConsulNamespacesE2ETest) testConsulConnectTerminatingGateway(f *framework.F, nsA, nsZ string) {
nomadClient := tc.Nomad()
jobID := "cns-connect-terminating"
tc.jobIDs = append(tc.jobIDs, jobID)
// Run job and wait for allocs
allocations := e2eutil.RegisterAndWaitForAllocs(f.T(), nomadClient, cnsJobConnectTerminating, jobID, "")
require.Len(f.T(), allocations, 6) // 2 x (2 services + 1 gateway)
allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocations)
e2eutil.WaitForAllocsRunning(f.T(), tc.Nomad(), allocIDs)
r := f.Assertions
c := tc.Consul()
// Verify services with cns set were registered into "default" Consul namespace
e2eutil.RequireConsulRegistered(r, c, nsA, "api-gateway", 1)
e2eutil.RequireConsulRegistered(r, c, nsA, "count-api", 1)
e2eutil.RequireConsulRegistered(r, c, nsA, "count-dashboard", 1)
// Verify services without cns set were registered into "default" Consul namespace
e2eutil.RequireConsulRegistered(r, c, nsZ, "api-gateway-z", 1)
e2eutil.RequireConsulRegistered(r, c, nsZ, "count-api-z", 1)
e2eutil.RequireConsulRegistered(r, c, nsZ, "count-dashboard-z", 1)
// Read the config entry of gateway with cns set, checking it exists in "default' namespace
ce := e2eutil.ReadConsulConfigEntry(f.T(), c, nsA, "terminating-gateway", "api-gateway")
require.Equal(f.T(), nsA, ce.GetNamespace())
// Read the config entry of gateway without cns set, checking it exists in "default' namespace
ceZ := e2eutil.ReadConsulConfigEntry(f.T(), c, nsZ, "terminating-gateway", "api-gateway-z")
require.Equal(f.T(), nsZ, ceZ.GetNamespace())
// Stop the job
e2eutil.WaitForJobStopped(f.T(), nomadClient, jobID)
// Remove the config entries
e2eutil.DeleteConsulConfigEntry(f.T(), c, nsA, "terminating-gateway", "api-gateway")
e2eutil.DeleteConsulConfigEntry(f.T(), c, nsZ, "terminating-gateway", "api-gateway-z")
}
func (tc *ConsulNamespacesE2ETest) testConsulScriptChecksTask(f *framework.F, nsA, nsZ string) {
nomadClient := tc.Nomad()
jobID := "cns-script-checks-task"
tc.jobIDs = append(tc.jobIDs, jobID)
// Run job and wait for allocs
allocations := e2eutil.RegisterAndWaitForAllocs(f.T(), nomadClient, cnsJobScriptChecksTask, jobID, "")
require.Len(f.T(), allocations, 2)
allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocations)
e2eutil.WaitForAllocsRunning(f.T(), tc.Nomad(), allocIDs)
r := f.Assertions
c := tc.Consul()
sort.Sort(e2eutil.AllocsByName(allocations))
allocsWithSetNamespace := allocations[0:1]
allocsWithNoNamespace := allocations[1:2]
// Verify checks with namespace set are set into expected namespace
e2eutil.RequireConsulStatus(r, c, nsA, "service-1a", capi.HealthPassing)
e2eutil.RequireConsulStatus(r, c, nsA, "service-2a", capi.HealthWarning)
e2eutil.RequireConsulStatus(r, c, nsA, "service-3a", capi.HealthCritical)
// Check in warning state becomes healthy after check passes for the service
// with specified Consul namespace
//
// (ensures UpdateTTL is respecting namespace)
_, _, err := exec(nomadClient, allocsWithSetNamespace,
[]string{"/bin/sh", "-c", "touch ${NOMAD_TASK_DIR}/alive-2ab"})
r.NoError(err)
e2eutil.RequireConsulStatus(r, c, nsA, "service-2a", capi.HealthPassing)
// Verify checks without namespace are set in default namespace
e2eutil.RequireConsulStatus(r, c, nsZ, "service-1z", capi.HealthPassing)
e2eutil.RequireConsulStatus(r, c, nsZ, "service-2z", capi.HealthWarning)
e2eutil.RequireConsulStatus(r, c, nsZ, "service-3z", capi.HealthCritical)
// Check in warning state becomes healthy after check passes for the service
// with specified Consul namespace
//
// (ensures UpdateTTL is respecting namespace)
_, _, errZ := exec(nomadClient, allocsWithNoNamespace,
[]string{"/bin/sh", "-c", "touch ${NOMAD_TASK_DIR}/alive-2zb"})
r.NoError(errZ)
e2eutil.RequireConsulStatus(r, c, nsZ, "service-2z", capi.HealthPassing)
// Stop the job
e2eutil.WaitForJobStopped(f.T(), nomadClient, jobID)
}
func (tc *ConsulNamespacesE2ETest) testConsulScriptChecksGroup(f *framework.F, nsA, nsZ string) {
nomadClient := tc.Nomad()
jobID := "cns-script-checks-group"
tc.jobIDs = append(tc.jobIDs, jobID)
// Run job and wait for allocs
allocations := e2eutil.RegisterAndWaitForAllocs(f.T(), nomadClient, cnsJobScriptChecksGroup, jobID, "")
require.Len(f.T(), allocations, 2)
allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocations)
e2eutil.WaitForAllocsRunning(f.T(), tc.Nomad(), allocIDs)
r := f.Assertions
c := tc.Consul()
sort.Sort(e2eutil.AllocsByName(allocations))
allocsWithSetNamespace := allocations[0:1]
allocsWithNoNamespace := allocations[1:2]
// Verify checks were registered into "default" Consul namespace
e2eutil.RequireConsulStatus(r, c, nsA, "service-1a", capi.HealthPassing)
e2eutil.RequireConsulStatus(r, c, nsA, "service-2a", capi.HealthWarning)
e2eutil.RequireConsulStatus(r, c, nsA, "service-3a", capi.HealthCritical)
// Check in warning state becomes healthy after check passes for the service
// with specified Consul namespace
//
// (ensures UpdateTTL is respecting namespace)
_, _, err := exec(nomadClient, allocsWithSetNamespace,
[]string{"/bin/sh", "-c", "touch /tmp/${NOMAD_ALLOC_ID}-alive-2ab"})
r.NoError(err)
e2eutil.RequireConsulStatus(r, c, nsA, "service-2a", capi.HealthPassing)
// Verify checks were registered into "default" Consul namespace when no
// namespace was specified.
e2eutil.RequireConsulStatus(r, c, nsZ, "service-1z", capi.HealthPassing)
e2eutil.RequireConsulStatus(r, c, nsZ, "service-2z", capi.HealthWarning)
e2eutil.RequireConsulStatus(r, c, nsZ, "service-3z", capi.HealthCritical)
// Check in warning state becomes healthy after check passes for the service
// with specified Consul namespace
//
// (ensures UpdateTTL is respecting namespace)
_, _, errZ := exec(nomadClient, allocsWithNoNamespace,
[]string{"/bin/sh", "-c", "touch /tmp/${NOMAD_ALLOC_ID}-alive-2zb"})
r.NoError(errZ)
e2eutil.RequireConsulStatus(r, c, nsZ, "service-2z", capi.HealthPassing)
// Stop the job
e2eutil.WaitForJobStopped(f.T(), nomadClient, jobID)
}

View File

@@ -504,17 +504,21 @@ func (s *Server) purgeSITokenAccessors(accessors []*structs.SITokenAccessor) err
// ConsulConfigsAPI is an abstraction over the consul/api.ConfigEntries API used by
// Nomad Server.
//
// Nomad will only perform write operations on Consul Ingress Gateway Configuration Entries.
// Removing the entries is not particularly safe, given that multiple Nomad clusters
// may be writing to the same config entries, which are global in the Consul scope.
// Nomad will only perform write operations on Consul Ingress/Terminating Gateway
// Configuration Entries. Removing the entries is not yet safe, given that multiple
// Nomad clusters may be writing to the same config entries, which are global in
// the Consul scope. There was a Meta field introduced which Nomad can leverage
// in the future, when Consul no longer supports versions that do not contain the
// field. The Meta field would be used to track which Nomad "owns" the CE.
// https://github.com/hashicorp/nomad/issues/8971
type ConsulConfigsAPI interface {
// SetIngressCE adds the given ConfigEntry to Consul, overwriting
// the previous entry if set.
SetIngressCE(ctx context.Context, service string, entry *structs.ConsulIngressConfigEntry) error
SetIngressCE(ctx context.Context, namespace, service string, entry *structs.ConsulIngressConfigEntry) error
// SetTerminatingCE adds the given ConfigEntry to Consul, overwriting
// the previous entry if set.
SetTerminatingCE(ctx context.Context, service string, entry *structs.ConsulTerminatingConfigEntry) error
SetTerminatingCE(ctx context.Context, namespace, service string, entry *structs.ConsulTerminatingConfigEntry) error
// Stop is used to stop additional creations of Configuration Entries. Intended to
// be used on Nomad Server shutdown.
@@ -552,16 +556,14 @@ func (c *consulConfigsAPI) Stop() {
c.stopped = true
}
func (c *consulConfigsAPI) SetIngressCE(ctx context.Context, service string, entry *structs.ConsulIngressConfigEntry) error {
return c.setCE(ctx, convertIngressCE(service, entry))
func (c *consulConfigsAPI) SetIngressCE(ctx context.Context, namespace, service string, entry *structs.ConsulIngressConfigEntry) error {
return c.setCE(ctx, convertIngressCE(namespace, service, entry))
}
func (c *consulConfigsAPI) SetTerminatingCE(ctx context.Context, service string, entry *structs.ConsulTerminatingConfigEntry) error {
return c.setCE(ctx, convertTerminatingCE(service, entry))
func (c *consulConfigsAPI) SetTerminatingCE(ctx context.Context, namespace, service string, entry *structs.ConsulTerminatingConfigEntry) error {
return c.setCE(ctx, convertTerminatingCE(namespace, service, entry))
}
// also mesh
// setCE will set the Configuration Entry of any type Consul supports.
func (c *consulConfigsAPI) setCE(ctx context.Context, entry api.ConfigEntry) error {
defer metrics.MeasureSince([]string{"nomad", "consul", "create_config_entry"}, time.Now())
@@ -580,11 +582,11 @@ func (c *consulConfigsAPI) setCE(ctx context.Context, entry api.ConfigEntry) err
return err
}
_, _, err := c.configsClient.Set(entry, nil)
_, _, err := c.configsClient.Set(entry, &api.WriteOptions{Namespace: entry.GetNamespace()})
return err
}
func convertIngressCE(service string, entry *structs.ConsulIngressConfigEntry) api.ConfigEntry {
func convertIngressCE(namespace, service string, entry *structs.ConsulIngressConfigEntry) api.ConfigEntry {
var listeners []api.IngressListener = nil
for _, listener := range entry.Listeners {
var services []api.IngressService = nil
@@ -607,6 +609,7 @@ func convertIngressCE(service string, entry *structs.ConsulIngressConfigEntry) a
}
return &api.IngressGatewayConfigEntry{
Namespace: namespace,
Kind: api.IngressGateway,
Name: service,
TLS: api.GatewayTLSConfig{Enabled: tlsEnabled},
@@ -614,7 +617,7 @@ func convertIngressCE(service string, entry *structs.ConsulIngressConfigEntry) a
}
}
func convertTerminatingCE(service string, entry *structs.ConsulTerminatingConfigEntry) api.ConfigEntry {
func convertTerminatingCE(namespace, service string, entry *structs.ConsulTerminatingConfigEntry) api.ConfigEntry {
var linked []api.LinkedService = nil
for _, s := range entry.Services {
linked = append(linked, api.LinkedService{
@@ -626,8 +629,9 @@ func convertTerminatingCE(service string, entry *structs.ConsulTerminatingConfig
})
}
return &api.TerminatingGatewayConfigEntry{
Kind: api.TerminatingGateway,
Name: service,
Services: linked,
Namespace: namespace,
Kind: api.TerminatingGateway,
Name: service,
Services: linked,
}
}

View File

@@ -41,29 +41,32 @@ func TestConsulConfigsAPI_SetCE(t *testing.T) {
ctx := context.Background()
// existing behavior is no set namespace
consulNamespace := ""
ingressCE := new(structs.ConsulIngressConfigEntry)
t.Run("ingress ok", func(t *testing.T) {
try(t, nil, func(c ConsulConfigsAPI) error {
return c.SetIngressCE(ctx, "ig", ingressCE)
return c.SetIngressCE(ctx, consulNamespace, "ig", ingressCE)
})
})
t.Run("ingress fail", func(t *testing.T) {
try(t, errors.New("consul broke"), func(c ConsulConfigsAPI) error {
return c.SetIngressCE(ctx, "ig", ingressCE)
return c.SetIngressCE(ctx, consulNamespace, "ig", ingressCE)
})
})
terminatingCE := new(structs.ConsulTerminatingConfigEntry)
t.Run("terminating ok", func(t *testing.T) {
try(t, nil, func(c ConsulConfigsAPI) error {
return c.SetTerminatingCE(ctx, "tg", terminatingCE)
return c.SetTerminatingCE(ctx, consulNamespace, "tg", terminatingCE)
})
})
t.Run("terminating fail", func(t *testing.T) {
try(t, errors.New("consul broke"), func(c ConsulConfigsAPI) error {
return c.SetTerminatingCE(ctx, "tg", terminatingCE)
return c.SetTerminatingCE(ctx, consulNamespace, "tg", terminatingCE)
})
})

View File

@@ -281,8 +281,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
}
// Create or Update Consul Configuration Entries defined in the job. For now
// Nomad only supports Configuration Entries of type "ingress-gateway" for managing
// Consul Connect Ingress Gateway tasks derived from TaskGroup services.
// Nomad only supports Configuration Entries types
// - "ingress-gateway" for managing Ingress Gateways
// - "terminating-gateway" for managing Terminating Gateways
//
// This is done as a blocking operation that prevents the job from being
// submitted if the configuration entries cannot be set in Consul.
@@ -290,18 +291,19 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
// Every job update will re-write the Configuration Entry into Consul.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
entries := args.Job.ConfigEntries()
for service, entry := range entries.Ingress {
if err := j.srv.consulConfigEntries.SetIngressCE(ctx, service, entry); err != nil {
return err
for ns, entries := range args.Job.ConfigEntries() {
for service, entry := range entries.Ingress {
if errCE := j.srv.consulConfigEntries.SetIngressCE(ctx, ns, service, entry); errCE != nil {
return errCE
}
}
for service, entry := range entries.Terminating {
if errCE := j.srv.consulConfigEntries.SetTerminatingCE(ctx, ns, service, entry); errCE != nil {
return errCE
}
}
}
for service, entry := range entries.Terminating {
if err := j.srv.consulConfigEntries.SetTerminatingCE(ctx, service, entry); err != nil {
return err
}
}
// also mesh
// Enforce Sentinel policies. Pass a copy of the job to prevent
// sentinel from altering it.

View File

@@ -1,33 +1,39 @@
package structs
// ConsulConfigEntries represents Consul ConfigEntry definitions from a job.
// ConsulConfigEntries represents Consul ConfigEntry definitions from a job for
// a single Consul namespace.
type ConsulConfigEntries struct {
Ingress map[string]*ConsulIngressConfigEntry
Terminating map[string]*ConsulTerminatingConfigEntry
// Mesh later
}
// ConfigEntries accumulates the Consul Configuration Entries defined in task groups
// of j.
func (j *Job) ConfigEntries() *ConsulConfigEntries {
entries := &ConsulConfigEntries{
Ingress: make(map[string]*ConsulIngressConfigEntry),
Terminating: make(map[string]*ConsulTerminatingConfigEntry),
// Mesh later
}
// of j, organized by Consul namespace.
func (j *Job) ConfigEntries() map[string]*ConsulConfigEntries {
collection := make(map[string]*ConsulConfigEntries)
for _, tg := range j.TaskGroups {
// accumulate config entries by namespace
ns := tg.Consul.GetNamespace()
if _, exists := collection[ns]; !exists {
collection[ns] = &ConsulConfigEntries{
Ingress: make(map[string]*ConsulIngressConfigEntry),
Terminating: make(map[string]*ConsulTerminatingConfigEntry),
}
}
for _, service := range tg.Services {
if service.Connect.IsGateway() {
gateway := service.Connect.Gateway
if ig := gateway.Ingress; ig != nil {
entries.Ingress[service.Name] = ig
} else if tg := gateway.Terminating; tg != nil {
entries.Terminating[service.Name] = tg
} // mesh later
collection[ns].Ingress[service.Name] = ig
} else if term := gateway.Terminating; term != nil {
collection[ns].Terminating[service.Name] = term
}
}
}
}
return entries
return collection
}

View File

@@ -0,0 +1,93 @@
// +build !ent
package structs
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestJob_ConfigEntries(t *testing.T) {
t.Parallel()
ingress := &ConsulConnect{
Gateway: &ConsulGateway{
Ingress: new(ConsulIngressConfigEntry),
},
}
terminating := &ConsulConnect{
Gateway: &ConsulGateway{
Terminating: new(ConsulTerminatingConfigEntry),
},
}
j := &Job{
TaskGroups: []*TaskGroup{{
Name: "group1",
Consul: nil,
Services: []*Service{{
Name: "group1-service1",
Connect: ingress,
}, {
Name: "group1-service2",
Connect: nil,
}, {
Name: "group1-service3",
Connect: terminating,
}},
}, {
Name: "group2",
Consul: nil,
Services: []*Service{{
Name: "group2-service1",
Connect: ingress,
}},
}, {
Name: "group3",
Consul: &Consul{Namespace: "apple"},
Services: []*Service{{
Name: "group3-service1",
Connect: ingress,
}},
}, {
Name: "group4",
Consul: &Consul{Namespace: "apple"},
Services: []*Service{{
Name: "group4-service1",
Connect: ingress,
}, {
Name: "group4-service2",
Connect: terminating,
}},
}, {
Name: "group5",
Consul: &Consul{Namespace: "banana"},
Services: []*Service{{
Name: "group5-service1",
Connect: ingress,
}},
}},
}
exp := map[string]*ConsulConfigEntries{
// in OSS, consul namespace is not supported
"": &ConsulConfigEntries{
Ingress: map[string]*ConsulIngressConfigEntry{
"group1-service1": new(ConsulIngressConfigEntry),
"group2-service1": new(ConsulIngressConfigEntry),
"group3-service1": new(ConsulIngressConfigEntry),
"group4-service1": new(ConsulIngressConfigEntry),
"group5-service1": new(ConsulIngressConfigEntry),
},
Terminating: map[string]*ConsulTerminatingConfigEntry{
"group1-service3": new(ConsulTerminatingConfigEntry),
"group4-service2": new(ConsulTerminatingConfigEntry),
},
},
}
entries := j.ConfigEntries()
require.EqualValues(t, exp, entries)
}