From 72f411c9866d7b93f0c139e67c5e0a34310848e5 Mon Sep 17 00:00:00 2001 From: James Rasell Date: Fri, 11 Feb 2022 09:29:38 +0100 Subject: [PATCH] client: track service deregister call so it's only called once. In certain task lifecycles the taskrunner service deregister call could be called three times for a task that is exiting. Whilst each hook caller of deregister has its own purpose, we should try and ensure it is only called once during the shutdown lifecycle of a task. This change therefore tracks when deregister has been called, so that subsequent calls are noop. In the event the task is restarting, the deregister value is reset to ensure proper operation. --- client/allocrunner/alloc_runner_unix_test.go | 6 +-- client/allocrunner/taskrunner/service_hook.go | 10 +++- .../taskrunner/service_hook_test.go | 50 +++++++++++++++++++ .../taskrunner/task_runner_test.go | 5 +- 4 files changed, 63 insertions(+), 8 deletions(-) diff --git a/client/allocrunner/alloc_runner_unix_test.go b/client/allocrunner/alloc_runner_unix_test.go index b21d3fecb..41b5dabc8 100644 --- a/client/allocrunner/alloc_runner_unix_test.go +++ b/client/allocrunner/alloc_runner_unix_test.go @@ -122,11 +122,11 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) { // Assert consul was cleaned up: // 1 removal during prekill - // 1 removal during exited - // 1 removal during stop + // - removal during exited is de-duped due to prekill + // - removal during stop is de-duped due to prekill // 1 removal group during stop consulOps := conf2.Consul.(*consul.MockConsulServiceClient).GetOps() - require.Len(t, consulOps, 4) + require.Len(t, consulOps, 2) for _, op := range consulOps { require.Equal(t, "remove", op.Op) } diff --git a/client/allocrunner/taskrunner/service_hook.go b/client/allocrunner/taskrunner/service_hook.go index 95bf1d214..ecc022c40 100644 --- a/client/allocrunner/taskrunner/service_hook.go +++ b/client/allocrunner/taskrunner/service_hook.go @@ -54,6 +54,10 @@ type serviceHook struct { // fields required in other lifecycle funcs initialRegistration bool + // deregistered tracks whether deregister() has previously been called, so + // we do not call this multiple times for a single task when not needed. + deregistered bool + // Since Update() may be called concurrently with any other hook all // hook methods must be fully serialized mu sync.Mutex @@ -96,6 +100,9 @@ func (h *serviceHook) Poststart(ctx context.Context, req *interfaces.TaskPoststa h.taskEnv = req.TaskEnv h.initialRegistration = true + // Ensure deregistered is unset. + h.deregistered = false + // Create task services struct with request's driver metadata workloadServices := h.getWorkloadServices() @@ -171,11 +178,12 @@ func (h *serviceHook) Exited(context.Context, *interfaces.TaskExitedRequest, *in // deregister services from Consul. func (h *serviceHook) deregister() { - if len(h.services) > 0 { + if len(h.services) > 0 && !h.deregistered { workloadServices := h.getWorkloadServices() h.consulServices.RemoveWorkload(workloadServices) } h.initialRegistration = false + h.deregistered = true } func (h *serviceHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest, resp *interfaces.TaskStopResponse) error { diff --git a/client/allocrunner/taskrunner/service_hook_test.go b/client/allocrunner/taskrunner/service_hook_test.go index ad88fabd3..bdae6bfd0 100644 --- a/client/allocrunner/taskrunner/service_hook_test.go +++ b/client/allocrunner/taskrunner/service_hook_test.go @@ -51,3 +51,53 @@ func TestUpdate_beforePoststart(t *testing.T) { require.NoError(t, hook.Update(context.Background(), &interfaces.TaskUpdateRequest{Alloc: alloc}, &interfaces.TaskUpdateResponse{})) require.Len(t, c.GetOps(), 6) } + +func Test_serviceHook_multipleDeRegisterCall(t *testing.T) { + + alloc := mock.Alloc() + logger := testlog.HCLogger(t) + c := consul.NewMockConsulServiceClient(t, logger) + + hook := newServiceHook(serviceHookConfig{ + alloc: alloc, + task: alloc.LookupTask("web"), + consulServices: c, + logger: logger, + }) + + // Add a registration, as we would in normal operation. + require.NoError(t, hook.Poststart(context.Background(), &interfaces.TaskPoststartRequest{}, &interfaces.TaskPoststartResponse{})) + require.Len(t, c.GetOps(), 1) + + // Call all three deregister backed functions in a row. Ensure the number + // of operations does not increase and that the second is always a remove. + require.NoError(t, hook.Exited(context.Background(), &interfaces.TaskExitedRequest{}, &interfaces.TaskExitedResponse{})) + require.Len(t, c.GetOps(), 2) + require.Equal(t, c.GetOps()[1].Op, "remove") + + require.NoError(t, hook.PreKilling(context.Background(), &interfaces.TaskPreKillRequest{}, &interfaces.TaskPreKillResponse{})) + require.Len(t, c.GetOps(), 2) + require.Equal(t, c.GetOps()[1].Op, "remove") + + require.NoError(t, hook.Stop(context.Background(), &interfaces.TaskStopRequest{}, &interfaces.TaskStopResponse{})) + require.Len(t, c.GetOps(), 2) + require.Equal(t, c.GetOps()[1].Op, "remove") + + // Now we act like a restart. + require.NoError(t, hook.Poststart(context.Background(), &interfaces.TaskPoststartRequest{}, &interfaces.TaskPoststartResponse{})) + require.Len(t, c.GetOps(), 3) + require.Equal(t, c.GetOps()[2].Op, "add") + + // Go again through the process or shutting down. + require.NoError(t, hook.Exited(context.Background(), &interfaces.TaskExitedRequest{}, &interfaces.TaskExitedResponse{})) + require.Len(t, c.GetOps(), 4) + require.Equal(t, c.GetOps()[3].Op, "remove") + + require.NoError(t, hook.PreKilling(context.Background(), &interfaces.TaskPreKillRequest{}, &interfaces.TaskPreKillResponse{})) + require.Len(t, c.GetOps(), 4) + require.Equal(t, c.GetOps()[3].Op, "remove") + + require.NoError(t, hook.Stop(context.Background(), &interfaces.TaskStopRequest{}, &interfaces.TaskStopResponse{})) + require.Len(t, c.GetOps(), 4) + require.Equal(t, c.GetOps()[3].Op, "remove") +} diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 9b60458b4..77741c802 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -2481,7 +2481,7 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) { consul := conf.Consul.(*consulapi.MockConsulServiceClient) consulOps := consul.GetOps() - require.Len(t, consulOps, 5) + require.Len(t, consulOps, 4) // Initial add require.Equal(t, "add", consulOps[0].Op) @@ -2494,9 +2494,6 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) { // Removing entries on retry require.Equal(t, "remove", consulOps[3].Op) - - // Removing entries on stop - require.Equal(t, "remove", consulOps[4].Op) } // testWaitForTaskToStart waits for the task to be running or fails the test