Merge pull request #4365 from hashicorp/b-consul-server-resync

Fix consul server resync when server and client point to the same Consul agent
This commit is contained in:
Preetha
2018-06-01 14:50:55 -07:00
committed by GitHub
5 changed files with 26 additions and 9 deletions

View File

@@ -143,7 +143,7 @@ func testTaskRunnerFromAlloc(t *testing.T, restarts bool, alloc *structs.Allocat
vclient := vaultclient.NewMockVaultClient()
cclient := consul.NewMockAgent()
serviceClient := consul.NewServiceClient(cclient, logger)
serviceClient := consul.NewServiceClient(cclient, logger, true)
go serviceClient.Run()
tr := NewTaskRunner(logger, conf, db, upd.Update, taskDir, alloc, task, vclient, serviceClient)
if !restarts {
@@ -1860,7 +1860,7 @@ func TestTaskRunner_CheckWatcher_Restart(t *testing.T) {
// backed by a mock consul whose checks are always unhealthy.
consulAgent := consul.NewMockAgent()
consulAgent.SetStatus("critical")
consulClient := consul.NewServiceClient(consulAgent, ctx.tr.logger)
consulClient := consul.NewServiceClient(consulAgent, ctx.tr.logger, true)
go consulClient.Run()
defer consulClient.Shutdown()

View File

@@ -958,7 +958,7 @@ func (a *Agent) setupConsul(consulConfig *config.ConsulConfig) error {
a.consulCatalog = client.Catalog()
// Create Consul Service client for service advertisement and checks.
a.consulService = consul.NewServiceClient(client.Agent(), a.logger)
a.consulService = consul.NewServiceClient(client.Agent(), a.logger, a.Client() != nil)
// Run the Consul service client's sync'ing main loop
go a.consulService.Run()

View File

@@ -232,11 +232,17 @@ type ServiceClient struct {
// checkWatcher restarts checks that are unhealthy.
checkWatcher *checkWatcher
// isClientAgent specifies whether this Consul client is being used
// by a Nomad client.
isClientAgent bool
}
// NewServiceClient creates a new Consul ServiceClient from an existing Consul API
// Client and logger.
func NewServiceClient(consulClient AgentAPI, logger *log.Logger) *ServiceClient {
// Client, logger and takes whether the client is being used by a Nomad Client agent.
// When being used by a Nomad client, this Consul client reconciles all services and
// checks created by Nomad on behalf of running tasks.
func NewServiceClient(consulClient AgentAPI, logger *log.Logger, isNomadClient bool) *ServiceClient {
return &ServiceClient{
client: consulClient,
logger: logger,
@@ -255,6 +261,7 @@ func NewServiceClient(consulClient AgentAPI, logger *log.Logger) *ServiceClient
agentServices: make(map[string]struct{}),
agentChecks: make(map[string]struct{}),
checkWatcher: newCheckWatcher(logger, consulClient),
isClientAgent: isNomadClient,
}
}
@@ -433,7 +440,12 @@ func (c *ServiceClient) sync() error {
// Known service, skip
continue
}
if !isNomadService(id) {
// Ignore if this is not a Nomad managed service. Also ignore
// Nomad managed services if this is not a client agent.
// This is to prevent server agents from removing services
// registered by client agents
if !isNomadService(id) || !c.isClientAgent {
// Not managed by Nomad, skip
continue
}
@@ -470,7 +482,12 @@ func (c *ServiceClient) sync() error {
// Known check, leave it
continue
}
if !isNomadService(check.ServiceID) {
// Ignore if this is not a Nomad managed check. Also ignore
// Nomad managed checks if this is not a client agent.
// This is to prevent server agents from removing checks
// registered by client agents
if !isNomadService(check.ServiceID) || !c.isClientAgent {
// Service not managed by Nomad, skip
continue
}

View File

@@ -142,7 +142,7 @@ func TestConsul_Integration(t *testing.T) {
consulClient, err := consulapi.NewClient(consulConfig)
assert.Nil(err)
serviceClient := consul.NewServiceClient(consulClient.Agent(), logger)
serviceClient := consul.NewServiceClient(consulClient.Agent(), logger, true)
defer serviceClient.Shutdown() // just-in-case cleanup
consulRan := make(chan struct{})
go func() {

View File

@@ -117,7 +117,7 @@ func setupFake(t *testing.T) *testFakeCtx {
fc := NewMockAgent()
tt := testTask()
return &testFakeCtx{
ServiceClient: NewServiceClient(fc, testlog.Logger(t)),
ServiceClient: NewServiceClient(fc, testlog.Logger(t), true),
FakeConsul: fc,
Task: tt,
MockExec: tt.DriverExec.(*mockExec),