From 2470bf9128f2689c9916e826cd91fd3254490b22 Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Thu, 26 Jan 2023 08:17:57 -0600 Subject: [PATCH] nsd: block on removal of services (#15862) * nsd: block on removal of services This PR uses a WaitGroup to ensure workload removals are complete before returning from ServiceRegistrationHandler.RemoveWorkload of the nomad service provider. The de-registration of individual services still occurs asynchrously, but we must block on the parent removal call so that we do not race with further operations on the same set of services - e.g. in the case of a task restart where we de-register and then re-register the services in quick succession. Fixes #15032 * nsd: add e2e test for initial failing check and restart --- .changelog/15862.txt | 3 + client/allocrunner/group_service_hook.go | 2 +- client/serviceregistration/nsd/nsd.go | 16 ++++- .../input/checks_task_restart_helper.nomad | 46 +++++++++++++ .../input/checks_task_restart_main.nomad | 46 +++++++++++++ e2e/servicediscovery/nomad_checks_test.go | 68 +++++++++++++++++++ .../service_discovery_test.go | 1 + 7 files changed, 179 insertions(+), 3 deletions(-) create mode 100644 .changelog/15862.txt create mode 100644 e2e/servicediscovery/input/checks_task_restart_helper.nomad create mode 100644 e2e/servicediscovery/input/checks_task_restart_main.nomad diff --git a/.changelog/15862.txt b/.changelog/15862.txt new file mode 100644 index 000000000..d4a428614 --- /dev/null +++ b/.changelog/15862.txt @@ -0,0 +1,3 @@ +```release-note:bug +services: Fixed a bug where services would fail to register if task initially fails +``` diff --git a/client/allocrunner/group_service_hook.go b/client/allocrunner/group_service_hook.go index 7ce96bf6b..e350f7773 100644 --- a/client/allocrunner/group_service_hook.go +++ b/client/allocrunner/group_service_hook.go @@ -229,7 +229,7 @@ func (h *groupServiceHook) Postrun() error { return nil } -// deregister services from Consul. +// deregister services from Consul/Nomad service provider. func (h *groupServiceHook) deregister() { if len(h.services) > 0 { workloadServices := h.getWorkloadServices() diff --git a/client/serviceregistration/nsd/nsd.go b/client/serviceregistration/nsd/nsd.go index 1138c4458..7ebc5c149 100644 --- a/client/serviceregistration/nsd/nsd.go +++ b/client/serviceregistration/nsd/nsd.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strings" + "sync" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-multierror" @@ -136,13 +137,24 @@ func (s *ServiceRegistrationHandler) RegisterWorkload(workload *serviceregistrat // enabled. This covers situations where the feature is disabled, yet still has // allocations which, when stopped need their registrations removed. func (s *ServiceRegistrationHandler) RemoveWorkload(workload *serviceregistration.WorkloadServices) { + wg := new(sync.WaitGroup) + wg.Add(len(workload.Services)) + for _, serviceSpec := range workload.Services { - go s.removeWorkload(workload, serviceSpec) + go s.removeWorkload(wg, workload, serviceSpec) } + + // wait for all workload removals to complete + wg.Wait() } func (s *ServiceRegistrationHandler) removeWorkload( - workload *serviceregistration.WorkloadServices, serviceSpec *structs.Service) { + wg *sync.WaitGroup, + workload *serviceregistration.WorkloadServices, + serviceSpec *structs.Service, +) { + // unblock wait group when we are done + defer wg.Done() // Stop check watcher for _, service := range workload.Services { diff --git a/e2e/servicediscovery/input/checks_task_restart_helper.nomad b/e2e/servicediscovery/input/checks_task_restart_helper.nomad new file mode 100644 index 000000000..83539d2e1 --- /dev/null +++ b/e2e/servicediscovery/input/checks_task_restart_helper.nomad @@ -0,0 +1,46 @@ +variable "nodeID" { + type = string +} + +variable "cmd" { + type = string +} + +variable "delay" { + type = string +} + +job "checks_task_restart_helper" { + datacenters = ["dc1"] + type = "batch" + + group "group" { + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + constraint { + attribute = "${node.unique_id}" + value = "${var.nodeID}" + } + + reschedule { + attempts = 0 + unlimited = false + } + + task "touch" { + driver = "raw_exec" + config { + command = "bash" + args = ["-c", "sleep ${var.delay} && ${var.cmd} /tmp/nsd-checks-task-restart-test.txt"] + } + resources { + cpu = 50 + memory = 32 + } + } + } +} diff --git a/e2e/servicediscovery/input/checks_task_restart_main.nomad b/e2e/servicediscovery/input/checks_task_restart_main.nomad new file mode 100644 index 000000000..580c41401 --- /dev/null +++ b/e2e/servicediscovery/input/checks_task_restart_main.nomad @@ -0,0 +1,46 @@ +job "checks_task_restart" { + datacenters = ["dc1"] + type = "service" + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + group "group" { + network { + mode = "host" + port "http" {} + } + + service { + provider = "nomad" + name = "nsd-checks-task-restart-test" + port = "http" + check { + name = "alive" + type = "http" + path = "/nsd-checks-task-restart-test.txt" + interval = "2s" + timeout = "1s" + check_restart { + limit = 10 + grace = "1s" + } + } + } + + task "python" { + driver = "raw_exec" + user = "nobody" + config { + command = "python3" + args = ["-m", "http.server", "${NOMAD_PORT_http}", "--directory", "/tmp"] + } + resources { + cpu = 50 + memory = 64 + } + } + } +} diff --git a/e2e/servicediscovery/nomad_checks_test.go b/e2e/servicediscovery/nomad_checks_test.go index e72b6fb0f..3be9a6f20 100644 --- a/e2e/servicediscovery/nomad_checks_test.go +++ b/e2e/servicediscovery/nomad_checks_test.go @@ -9,7 +9,10 @@ import ( "github.com/hashicorp/nomad/e2e/e2eutil" "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test" "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" "github.com/stretchr/testify/require" ) @@ -100,3 +103,68 @@ func testChecksSad(t *testing.T) { return strings.Contains(output, `

Error code explanation: HTTPStatus.NOT_IMPLEMENTED - Server does not support this operation.

`) }, 5*time.Second, 200*time.Millisecond) } + +func testChecksServiceReRegisterAfterCheckRestart(t *testing.T) { + const ( + jobChecksAfterRestartMain = "./input/checks_task_restart_main.nomad" + jobChecksAfterRestartHelper = "./input/checks_task_restart_helper.nomad" + ) + + nomadClient := e2eutil.NomadClient(t) + + idJobMain := "nsd-check-restart-services-" + uuid.Short() + idJobHelper := "nsd-check-restart-services-helper-" + uuid.Short() + jobIDs := []string{idJobMain, idJobHelper} + + // Defer a cleanup function to remove the job. This will trigger if the + // test fails, unless the cancel function is called. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + defer e2eutil.CleanupJobsAndGCWithContext(t, ctx, &jobIDs) + + // register the main job + allocStubs := e2eutil.RegisterAndWaitForAllocs(t, nomadClient, jobChecksAfterRestartMain, idJobMain, "") + must.Len(t, 1, allocStubs) + + // wait for task restart due to failing health check + must.Wait(t, wait.InitialSuccess( + wait.BoolFunc(func() bool { + allocEvents, err := e2eutil.AllocTaskEventsForJob(idJobMain, "") + if err != nil { + t.Log("failed to get task events for job", idJobMain, err) + return false + } + for _, events := range allocEvents { + for _, event := range events { + if event["Type"] == "Restarting" { + return true + } + } + } + return false + }), + wait.Timeout(30*time.Second), + wait.Gap(3*time.Second), + )) + + runHelper := func(command string) { + vars := []string{"-var", "nodeID=" + allocStubs[0].NodeID, "-var", "cmd=touch", "-var", "delay=3s"} + err := e2eutil.RegisterWithArgs(idJobHelper, jobChecksAfterRestartHelper, vars...) + test.NoError(t, err) + } + + // register helper job, triggering check to start passing + runHelper("touch") + defer func() { + runHelper("rm") + }() + + // wait for main task to become healthy + e2eutil.WaitForAllocStatus(t, nomadClient, allocStubs[0].ID, structs.AllocClientStatusRunning) + + // finally assert we have services + services := nomadClient.Services() + serviceStubs, _, err := services.Get("nsd-checks-task-restart-test", nil) + must.NoError(t, err) + must.Len(t, 1, serviceStubs) +} diff --git a/e2e/servicediscovery/service_discovery_test.go b/e2e/servicediscovery/service_discovery_test.go index 00596895a..0fd863562 100644 --- a/e2e/servicediscovery/service_discovery_test.go +++ b/e2e/servicediscovery/service_discovery_test.go @@ -45,6 +45,7 @@ func TestServiceDiscovery(t *testing.T) { t.Run("TestServiceDiscovery_SimpleLoadBalancing", testSimpleLoadBalancing) t.Run("TestServiceDiscovery_ChecksHappy", testChecksHappy) t.Run("TestServiceDiscovery_ChecksSad", testChecksSad) + t.Run("TestServiceDiscovery_ServiceRegisterAfterCheckRestart", testChecksServiceReRegisterAfterCheckRestart) } // testMultiProvider tests service discovery where multi providers are used