mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
Merge pull request #12052 from hashicorp/b-taskrunner-track-deregistered-call
client: track service deregister call so it's only called once.
This commit is contained in:
@@ -122,11 +122,11 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) {
|
||||
|
||||
// Assert consul was cleaned up:
|
||||
// 1 removal during prekill
|
||||
// 1 removal during exited
|
||||
// 1 removal during stop
|
||||
// - removal during exited is de-duped due to prekill
|
||||
// - removal during stop is de-duped due to prekill
|
||||
// 1 removal group during stop
|
||||
consulOps := conf2.Consul.(*consul.MockConsulServiceClient).GetOps()
|
||||
require.Len(t, consulOps, 4)
|
||||
require.Len(t, consulOps, 2)
|
||||
for _, op := range consulOps {
|
||||
require.Equal(t, "remove", op.Op)
|
||||
}
|
||||
|
||||
@@ -54,6 +54,10 @@ type serviceHook struct {
|
||||
// fields required in other lifecycle funcs
|
||||
initialRegistration bool
|
||||
|
||||
// deregistered tracks whether deregister() has previously been called, so
|
||||
// we do not call this multiple times for a single task when not needed.
|
||||
deregistered bool
|
||||
|
||||
// Since Update() may be called concurrently with any other hook all
|
||||
// hook methods must be fully serialized
|
||||
mu sync.Mutex
|
||||
@@ -96,6 +100,9 @@ func (h *serviceHook) Poststart(ctx context.Context, req *interfaces.TaskPoststa
|
||||
h.taskEnv = req.TaskEnv
|
||||
h.initialRegistration = true
|
||||
|
||||
// Ensure deregistered is unset.
|
||||
h.deregistered = false
|
||||
|
||||
// Create task services struct with request's driver metadata
|
||||
workloadServices := h.getWorkloadServices()
|
||||
|
||||
@@ -171,11 +178,12 @@ func (h *serviceHook) Exited(context.Context, *interfaces.TaskExitedRequest, *in
|
||||
|
||||
// deregister services from Consul.
|
||||
func (h *serviceHook) deregister() {
|
||||
if len(h.services) > 0 {
|
||||
if len(h.services) > 0 && !h.deregistered {
|
||||
workloadServices := h.getWorkloadServices()
|
||||
h.consulServices.RemoveWorkload(workloadServices)
|
||||
}
|
||||
h.initialRegistration = false
|
||||
h.deregistered = true
|
||||
}
|
||||
|
||||
func (h *serviceHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest, resp *interfaces.TaskStopResponse) error {
|
||||
|
||||
@@ -51,3 +51,53 @@ func TestUpdate_beforePoststart(t *testing.T) {
|
||||
require.NoError(t, hook.Update(context.Background(), &interfaces.TaskUpdateRequest{Alloc: alloc}, &interfaces.TaskUpdateResponse{}))
|
||||
require.Len(t, c.GetOps(), 6)
|
||||
}
|
||||
|
||||
func Test_serviceHook_multipleDeRegisterCall(t *testing.T) {
|
||||
|
||||
alloc := mock.Alloc()
|
||||
logger := testlog.HCLogger(t)
|
||||
c := consul.NewMockConsulServiceClient(t, logger)
|
||||
|
||||
hook := newServiceHook(serviceHookConfig{
|
||||
alloc: alloc,
|
||||
task: alloc.LookupTask("web"),
|
||||
consulServices: c,
|
||||
logger: logger,
|
||||
})
|
||||
|
||||
// Add a registration, as we would in normal operation.
|
||||
require.NoError(t, hook.Poststart(context.Background(), &interfaces.TaskPoststartRequest{}, &interfaces.TaskPoststartResponse{}))
|
||||
require.Len(t, c.GetOps(), 1)
|
||||
|
||||
// Call all three deregister backed functions in a row. Ensure the number
|
||||
// of operations does not increase and that the second is always a remove.
|
||||
require.NoError(t, hook.Exited(context.Background(), &interfaces.TaskExitedRequest{}, &interfaces.TaskExitedResponse{}))
|
||||
require.Len(t, c.GetOps(), 2)
|
||||
require.Equal(t, c.GetOps()[1].Op, "remove")
|
||||
|
||||
require.NoError(t, hook.PreKilling(context.Background(), &interfaces.TaskPreKillRequest{}, &interfaces.TaskPreKillResponse{}))
|
||||
require.Len(t, c.GetOps(), 2)
|
||||
require.Equal(t, c.GetOps()[1].Op, "remove")
|
||||
|
||||
require.NoError(t, hook.Stop(context.Background(), &interfaces.TaskStopRequest{}, &interfaces.TaskStopResponse{}))
|
||||
require.Len(t, c.GetOps(), 2)
|
||||
require.Equal(t, c.GetOps()[1].Op, "remove")
|
||||
|
||||
// Now we act like a restart.
|
||||
require.NoError(t, hook.Poststart(context.Background(), &interfaces.TaskPoststartRequest{}, &interfaces.TaskPoststartResponse{}))
|
||||
require.Len(t, c.GetOps(), 3)
|
||||
require.Equal(t, c.GetOps()[2].Op, "add")
|
||||
|
||||
// Go again through the process or shutting down.
|
||||
require.NoError(t, hook.Exited(context.Background(), &interfaces.TaskExitedRequest{}, &interfaces.TaskExitedResponse{}))
|
||||
require.Len(t, c.GetOps(), 4)
|
||||
require.Equal(t, c.GetOps()[3].Op, "remove")
|
||||
|
||||
require.NoError(t, hook.PreKilling(context.Background(), &interfaces.TaskPreKillRequest{}, &interfaces.TaskPreKillResponse{}))
|
||||
require.Len(t, c.GetOps(), 4)
|
||||
require.Equal(t, c.GetOps()[3].Op, "remove")
|
||||
|
||||
require.NoError(t, hook.Stop(context.Background(), &interfaces.TaskStopRequest{}, &interfaces.TaskStopResponse{}))
|
||||
require.Len(t, c.GetOps(), 4)
|
||||
require.Equal(t, c.GetOps()[3].Op, "remove")
|
||||
}
|
||||
|
||||
@@ -2481,7 +2481,7 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) {
|
||||
|
||||
consul := conf.Consul.(*consulapi.MockConsulServiceClient)
|
||||
consulOps := consul.GetOps()
|
||||
require.Len(t, consulOps, 5)
|
||||
require.Len(t, consulOps, 4)
|
||||
|
||||
// Initial add
|
||||
require.Equal(t, "add", consulOps[0].Op)
|
||||
@@ -2494,9 +2494,6 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) {
|
||||
|
||||
// Removing entries on retry
|
||||
require.Equal(t, "remove", consulOps[3].Op)
|
||||
|
||||
// Removing entries on stop
|
||||
require.Equal(t, "remove", consulOps[4].Op)
|
||||
}
|
||||
|
||||
// testWaitForTaskToStart waits for the task to be running or fails the test
|
||||
|
||||
Reference in New Issue
Block a user