mirror of
https://github.com/kemko/nomad.git
synced 2026-01-03 08:55:43 +03:00
Avoid de-registering slowly restored services
When a nomad client restarts/upgraded, nomad restores state from running task and starts the sync loop. If sync loop runs early, it may deregister services from Consul prematurely even when Consul has the running service as healthy. This is not ideal, as re-registering the service means potentially waiting a whole service health check interval before declaring the service healthy. We attempt to mitigate this by introducing an initialization probation period. During this time, we only deregister services and checks that were explicitly deregistered, and leave unrecognized ones alone. This serves as a grace period for restoring to complete, or for operators to restore should they recognize they restored with the wrong nomad data directory.
This commit is contained in:
@@ -68,6 +68,11 @@ const (
|
||||
|
||||
// ServiceTagSerf is the tag assigned to Serf services
|
||||
ServiceTagSerf = "serf"
|
||||
|
||||
// deregisterProbationPeriod is the initialization period where
|
||||
// services registered in Consul but not in Nomad don't get registered,
|
||||
// to allow for nomad restoring tasks
|
||||
deregisterProbationPeriod = 10 * time.Minute
|
||||
)
|
||||
|
||||
// CatalogAPI is the consul/api.Catalog API used by Nomad.
|
||||
@@ -230,6 +235,9 @@ type ServiceClient struct {
|
||||
scripts map[string]*scriptCheck
|
||||
runningScripts map[string]*scriptHandle
|
||||
|
||||
explicitlyDeregisteredServices map[string]bool
|
||||
explicitlyDeregisteredChecks map[string]bool
|
||||
|
||||
// allocRegistrations stores the services and checks that are registered
|
||||
// with Consul by allocation ID.
|
||||
allocRegistrations map[string]*AllocRegistration
|
||||
@@ -245,6 +253,11 @@ type ServiceClient struct {
|
||||
// atomics.
|
||||
seen int32
|
||||
|
||||
// deregisterProbationExpiry is the time before which consul sync shouldn't deregister
|
||||
// unknown services.
|
||||
// Used to mitigate risk of deleting restored services upon client restart.
|
||||
deregisterProbationExpiry time.Time
|
||||
|
||||
// checkWatcher restarts checks that are unhealthy.
|
||||
checkWatcher *checkWatcher
|
||||
|
||||
@@ -260,24 +273,27 @@ type ServiceClient struct {
|
||||
func NewServiceClient(consulClient AgentAPI, logger log.Logger, isNomadClient bool) *ServiceClient {
|
||||
logger = logger.ResetNamed("consul.sync")
|
||||
return &ServiceClient{
|
||||
client: consulClient,
|
||||
logger: logger,
|
||||
retryInterval: defaultRetryInterval,
|
||||
maxRetryInterval: defaultMaxRetryInterval,
|
||||
periodicInterval: defaultPeriodicInterval,
|
||||
exitCh: make(chan struct{}),
|
||||
shutdownCh: make(chan struct{}),
|
||||
shutdownWait: defaultShutdownWait,
|
||||
opCh: make(chan *operations, 8),
|
||||
services: make(map[string]*api.AgentServiceRegistration),
|
||||
checks: make(map[string]*api.AgentCheckRegistration),
|
||||
scripts: make(map[string]*scriptCheck),
|
||||
runningScripts: make(map[string]*scriptHandle),
|
||||
allocRegistrations: make(map[string]*AllocRegistration),
|
||||
agentServices: make(map[string]struct{}),
|
||||
agentChecks: make(map[string]struct{}),
|
||||
checkWatcher: newCheckWatcher(logger, consulClient),
|
||||
isClientAgent: isNomadClient,
|
||||
client: consulClient,
|
||||
logger: logger,
|
||||
retryInterval: defaultRetryInterval,
|
||||
maxRetryInterval: defaultMaxRetryInterval,
|
||||
periodicInterval: defaultPeriodicInterval,
|
||||
exitCh: make(chan struct{}),
|
||||
shutdownCh: make(chan struct{}),
|
||||
shutdownWait: defaultShutdownWait,
|
||||
opCh: make(chan *operations, 8),
|
||||
services: make(map[string]*api.AgentServiceRegistration),
|
||||
checks: make(map[string]*api.AgentCheckRegistration),
|
||||
scripts: make(map[string]*scriptCheck),
|
||||
runningScripts: make(map[string]*scriptHandle),
|
||||
explicitlyDeregisteredServices: make(map[string]bool),
|
||||
explicitlyDeregisteredChecks: make(map[string]bool),
|
||||
allocRegistrations: make(map[string]*AllocRegistration),
|
||||
agentServices: make(map[string]struct{}),
|
||||
agentChecks: make(map[string]struct{}),
|
||||
checkWatcher: newCheckWatcher(logger, consulClient),
|
||||
isClientAgent: isNomadClient,
|
||||
deregisterProbationExpiry: time.Now().Add(deregisterProbationPeriod),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -372,6 +388,9 @@ INIT:
|
||||
failures = 0
|
||||
}
|
||||
|
||||
// on successful sync, clear deregistered consul entities
|
||||
c.clearExplicitlyDeregistered()
|
||||
|
||||
// Reset timer to periodic interval to periodically
|
||||
// reconile with Consul
|
||||
if !retryTimer.Stop() {
|
||||
@@ -407,6 +426,11 @@ func (c *ServiceClient) commit(ops *operations) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ServiceClient) clearExplicitlyDeregistered() {
|
||||
c.explicitlyDeregisteredServices = map[string]bool{}
|
||||
c.explicitlyDeregisteredChecks = map[string]bool{}
|
||||
}
|
||||
|
||||
// merge registrations into state map prior to sync'ing with Consul
|
||||
func (c *ServiceClient) merge(ops *operations) {
|
||||
for _, s := range ops.regServices {
|
||||
@@ -420,6 +444,7 @@ func (c *ServiceClient) merge(ops *operations) {
|
||||
}
|
||||
for _, sid := range ops.deregServices {
|
||||
delete(c.services, sid)
|
||||
c.explicitlyDeregisteredServices[sid] = true
|
||||
}
|
||||
for _, cid := range ops.deregChecks {
|
||||
if script, ok := c.runningScripts[cid]; ok {
|
||||
@@ -428,6 +453,7 @@ func (c *ServiceClient) merge(ops *operations) {
|
||||
delete(c.runningScripts, cid)
|
||||
}
|
||||
delete(c.checks, cid)
|
||||
c.explicitlyDeregisteredChecks[cid] = true
|
||||
}
|
||||
metrics.SetGauge([]string{"client", "consul", "services"}, float32(len(c.services)))
|
||||
metrics.SetGauge([]string{"client", "consul", "checks"}, float32(len(c.checks)))
|
||||
@@ -450,6 +476,8 @@ func (c *ServiceClient) sync() error {
|
||||
return fmt.Errorf("error querying Consul checks: %v", err)
|
||||
}
|
||||
|
||||
inProbation := time.Now().Before(c.deregisterProbationExpiry)
|
||||
|
||||
// Remove Nomad services in Consul but unknown locally
|
||||
for id := range consulServices {
|
||||
if _, ok := c.services[id]; ok {
|
||||
@@ -466,6 +494,11 @@ func (c *ServiceClient) sync() error {
|
||||
continue
|
||||
}
|
||||
|
||||
// Ignore unknown services during probation
|
||||
if inProbation && !c.explicitlyDeregisteredServices[id] {
|
||||
continue
|
||||
}
|
||||
|
||||
// Unknown Nomad managed service; kill
|
||||
if err := c.client.ServiceDeregister(id); err != nil {
|
||||
if isOldNomadService(id) {
|
||||
@@ -518,6 +551,11 @@ func (c *ServiceClient) sync() error {
|
||||
continue
|
||||
}
|
||||
|
||||
// Ignore unknown services during probation
|
||||
if inProbation && !c.explicitlyDeregisteredChecks[id] {
|
||||
continue
|
||||
}
|
||||
|
||||
// Unknown Nomad managed check; remove
|
||||
if err := c.client.CheckDeregister(id); err != nil {
|
||||
if isOldNomadService(check.ServiceID) {
|
||||
|
||||
@@ -107,7 +107,11 @@ func (t *testFakeCtx) syncOnce() error {
|
||||
select {
|
||||
case ops := <-t.ServiceClient.opCh:
|
||||
t.ServiceClient.merge(ops)
|
||||
return t.ServiceClient.sync()
|
||||
err := t.ServiceClient.sync()
|
||||
if err == nil {
|
||||
t.ServiceClient.clearExplicitlyDeregistered()
|
||||
}
|
||||
return err
|
||||
default:
|
||||
return errNoOps
|
||||
}
|
||||
@@ -118,8 +122,13 @@ func (t *testFakeCtx) syncOnce() error {
|
||||
func setupFake(t *testing.T) *testFakeCtx {
|
||||
fc := NewMockAgent()
|
||||
tt := testTask()
|
||||
|
||||
// by default start fake client being out of probation
|
||||
sc := NewServiceClient(fc, testlog.HCLogger(t), true)
|
||||
sc.deregisterProbationExpiry = time.Now().Add(-1 * time.Minute)
|
||||
|
||||
return &testFakeCtx{
|
||||
ServiceClient: NewServiceClient(fc, testlog.HCLogger(t), true),
|
||||
ServiceClient: sc,
|
||||
FakeConsul: fc,
|
||||
Task: tt,
|
||||
MockExec: tt.DriverExec.(*mockExec),
|
||||
@@ -1676,3 +1685,235 @@ func TestConsul_ServiceName_Duplicates(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestConsul_ServiceDeregistration_OutOfProbation asserts that during in steady
|
||||
// state we remove any services we don't reconize locally
|
||||
func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := setupFake(t)
|
||||
require := require.New(t)
|
||||
|
||||
ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(-1 * time.Hour)
|
||||
|
||||
remainingTask := testTask()
|
||||
remainingTask.Services = []*structs.Service{
|
||||
{
|
||||
Name: "remaining-service",
|
||||
PortLabel: "x",
|
||||
Checks: []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "check",
|
||||
Type: "tcp",
|
||||
Interval: time.Second,
|
||||
Timeout: time.Second,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
remainingTaskServiceID := makeTaskServiceID(remainingTask.AllocID,
|
||||
remainingTask.Name, remainingTask.Services[0], false)
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterTask(remainingTask))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
require.Len(ctx.FakeConsul.checks, 1)
|
||||
|
||||
explicitlyRemovedTask := testTask()
|
||||
explicitlyRemovedTask.Services = []*structs.Service{
|
||||
{
|
||||
Name: "explicitly-removed-service",
|
||||
PortLabel: "y",
|
||||
Checks: []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "check",
|
||||
Type: "tcp",
|
||||
Interval: time.Second,
|
||||
Timeout: time.Second,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
explicitlyRemovedTaskServiceID := makeTaskServiceID(explicitlyRemovedTask.AllocID,
|
||||
explicitlyRemovedTask.Name, explicitlyRemovedTask.Services[0], false)
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterTask(explicitlyRemovedTask))
|
||||
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.Len(ctx.FakeConsul.services, 2)
|
||||
require.Len(ctx.FakeConsul.checks, 2)
|
||||
|
||||
// we register a task through nomad API then remove it out of band
|
||||
outofbandTask := testTask()
|
||||
outofbandTask.Services = []*structs.Service{
|
||||
{
|
||||
Name: "unknown-service",
|
||||
PortLabel: "x",
|
||||
Checks: []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "check",
|
||||
Type: "tcp",
|
||||
Interval: time.Second,
|
||||
Timeout: time.Second,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
outofbandTaskServiceID := makeTaskServiceID(outofbandTask.AllocID,
|
||||
outofbandTask.Name, outofbandTask.Services[0], false)
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterTask(outofbandTask))
|
||||
require.NoError(ctx.syncOnce())
|
||||
|
||||
require.Len(ctx.FakeConsul.services, 3)
|
||||
|
||||
// remove outofbandTask from local services so it appears unknown to client
|
||||
require.Len(ctx.ServiceClient.services, 3)
|
||||
require.Len(ctx.ServiceClient.checks, 3)
|
||||
|
||||
delete(ctx.ServiceClient.services, outofbandTaskServiceID)
|
||||
delete(ctx.ServiceClient.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
|
||||
|
||||
require.Len(ctx.ServiceClient.services, 2)
|
||||
require.Len(ctx.ServiceClient.checks, 2)
|
||||
|
||||
// Sync and ensure that explicitly removed service as well as outofbandTask were removed
|
||||
|
||||
ctx.ServiceClient.RemoveTask(explicitlyRemovedTask)
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.ServiceClient.sync())
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
require.Len(ctx.FakeConsul.checks, 1)
|
||||
|
||||
require.Contains(ctx.FakeConsul.services, remainingTaskServiceID)
|
||||
require.NotContains(ctx.FakeConsul.services, outofbandTaskServiceID)
|
||||
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID)
|
||||
|
||||
require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0]))
|
||||
require.NotContains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
|
||||
require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0]))
|
||||
}
|
||||
|
||||
// TestConsul_ServiceDeregistration_InProbation asserts that during initialization
|
||||
// we only deregister services that were explicitly removed and leave unknown
|
||||
// services untouched. This adds a grace period for restoring recovered tasks
|
||||
// before deregistering them
|
||||
func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := setupFake(t)
|
||||
require := require.New(t)
|
||||
|
||||
ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(1 * time.Hour)
|
||||
|
||||
remainingTask := testTask()
|
||||
remainingTask.Services = []*structs.Service{
|
||||
{
|
||||
Name: "remaining-service",
|
||||
PortLabel: "x",
|
||||
Checks: []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "check",
|
||||
Type: "tcp",
|
||||
Interval: time.Second,
|
||||
Timeout: time.Second,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
remainingTaskServiceID := makeTaskServiceID(remainingTask.AllocID,
|
||||
remainingTask.Name, remainingTask.Services[0], false)
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterTask(remainingTask))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
require.Len(ctx.FakeConsul.checks, 1)
|
||||
|
||||
explicitlyRemovedTask := testTask()
|
||||
explicitlyRemovedTask.Services = []*structs.Service{
|
||||
{
|
||||
Name: "explicitly-removed-service",
|
||||
PortLabel: "y",
|
||||
Checks: []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "check",
|
||||
Type: "tcp",
|
||||
Interval: time.Second,
|
||||
Timeout: time.Second,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
explicitlyRemovedTaskServiceID := makeTaskServiceID(explicitlyRemovedTask.AllocID,
|
||||
explicitlyRemovedTask.Name, explicitlyRemovedTask.Services[0], false)
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterTask(explicitlyRemovedTask))
|
||||
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.Len(ctx.FakeConsul.services, 2)
|
||||
require.Len(ctx.FakeConsul.checks, 2)
|
||||
|
||||
// we register a task through nomad API then remove it out of band
|
||||
outofbandTask := testTask()
|
||||
outofbandTask.Services = []*structs.Service{
|
||||
{
|
||||
Name: "unknown-service",
|
||||
PortLabel: "x",
|
||||
Checks: []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "check",
|
||||
Type: "tcp",
|
||||
Interval: time.Second,
|
||||
Timeout: time.Second,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
outofbandTaskServiceID := makeTaskServiceID(outofbandTask.AllocID,
|
||||
outofbandTask.Name, outofbandTask.Services[0], false)
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterTask(outofbandTask))
|
||||
require.NoError(ctx.syncOnce())
|
||||
|
||||
require.Len(ctx.FakeConsul.services, 3)
|
||||
|
||||
// remove outofbandTask from local services so it appears unknown to client
|
||||
require.Len(ctx.ServiceClient.services, 3)
|
||||
require.Len(ctx.ServiceClient.checks, 3)
|
||||
|
||||
delete(ctx.ServiceClient.services, outofbandTaskServiceID)
|
||||
delete(ctx.ServiceClient.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
|
||||
|
||||
require.Len(ctx.ServiceClient.services, 2)
|
||||
require.Len(ctx.ServiceClient.checks, 2)
|
||||
|
||||
// Sync and ensure that explicitly removed service was removed, but outofbandTask remains
|
||||
|
||||
ctx.ServiceClient.RemoveTask(explicitlyRemovedTask)
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.ServiceClient.sync())
|
||||
require.Len(ctx.FakeConsul.services, 2)
|
||||
require.Len(ctx.FakeConsul.checks, 2)
|
||||
|
||||
require.Contains(ctx.FakeConsul.services, remainingTaskServiceID)
|
||||
require.Contains(ctx.FakeConsul.services, outofbandTaskServiceID)
|
||||
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID)
|
||||
|
||||
require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0]))
|
||||
require.Contains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
|
||||
require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0]))
|
||||
|
||||
// after probation, outofband services and checks are removed
|
||||
ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(-1 * time.Hour)
|
||||
|
||||
require.NoError(ctx.ServiceClient.sync())
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
require.Len(ctx.FakeConsul.checks, 1)
|
||||
|
||||
require.Contains(ctx.FakeConsul.services, remainingTaskServiceID)
|
||||
require.NotContains(ctx.FakeConsul.services, outofbandTaskServiceID)
|
||||
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID)
|
||||
|
||||
require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0]))
|
||||
require.NotContains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
|
||||
require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0]))
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user