diff --git a/command/agent/consul/chaos_test.go b/command/agent/consul/chaos_test.go new file mode 100644 index 000000000..89b69ea2c --- /dev/null +++ b/command/agent/consul/chaos_test.go @@ -0,0 +1,193 @@ +// +build chaos + +package consul + +import ( + "fmt" + "io/ioutil" + "sort" + "strings" + "sync" + "testing" + "time" + + "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/nomad/structs/config" +) + +func TestSyncerChaos(t *testing.T) { + // Create an embedded Consul server + testconsul := testutil.NewTestServerConfig(t, func(c *testutil.TestServerConfig) { + // If -v wasn't specified squelch consul logging + if !testing.Verbose() { + c.Stdout = ioutil.Discard + c.Stderr = ioutil.Discard + } + }) + defer testconsul.Stop() + + // Configure Syncer to talk to the test server + cconf := config.DefaultConsulConfig() + cconf.Addr = testconsul.HTTPAddr + + clientSyncer, err := NewSyncer(cconf, nil, logger) + if err != nil { + t.Fatalf("Error creating Syncer: %v", err) + } + defer clientSyncer.Shutdown() + + execSyncer, err := NewSyncer(cconf, nil, logger) + if err != nil { + t.Fatalf("Error creating Syncer: %v", err) + } + defer execSyncer.Shutdown() + + clientService := &structs.Service{Name: "nomad-client"} + services := map[ServiceKey]*structs.Service{ + GenerateServiceKey(clientService): clientService, + } + if err := clientSyncer.SetServices("client", services); err != nil { + t.Fatalf("error setting client service: %v", err) + } + + const execn = 100 + const reapern = 2 + errors := make(chan error, 100) + wg := sync.WaitGroup{} + + // Start goroutines to concurrently SetServices + for i := 0; i < execn; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + domain := ServiceDomain(fmt.Sprintf("exec-%d", i)) + services := map[ServiceKey]*structs.Service{} + for ii := 0; ii < 10; ii++ { + s := &structs.Service{Name: fmt.Sprintf("exec-%d-%d", i, ii)} + services[GenerateServiceKey(s)] = s + if err := execSyncer.SetServices(domain, services); err != nil { + select { + case errors <- err: + default: + } + return + } + time.Sleep(1) + } + }(i) + } + + // SyncServices runs a timer started by Syncer.Run which we don't use + // in this test, so run SyncServices concurrently + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < execn; i++ { + if err := execSyncer.SyncServices(); err != nil { + select { + case errors <- err: + default: + } + return + } + time.Sleep(100) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + if err := clientSyncer.ReapUnmatched([]ServiceDomain{"nomad-client"}); err != nil { + select { + case errors <- err: + default: + } + return + } + }() + + // Reap all but exec-0-* + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < execn; i++ { + if err := execSyncer.ReapUnmatched([]ServiceDomain{"exec-0", ServiceDomain(fmt.Sprintf("exec-%d", i))}); err != nil { + select { + case errors <- err: + default: + } + } + time.Sleep(100) + } + }() + + go func() { + wg.Wait() + close(errors) + }() + + for err := range errors { + if err != nil { + t.Errorf("error setting service from executor goroutine: %v", err) + } + } + + // Do a final ReapUnmatched to get consul back into a deterministic state + if err := execSyncer.ReapUnmatched([]ServiceDomain{"exec-0"}); err != nil { + t.Fatalf("error doing final reap: %v", err) + } + + // flattenedServices should be fully populated as ReapUnmatched doesn't + // touch Syncer's internal state + expected := map[string]struct{}{} + for i := 0; i < execn; i++ { + for ii := 0; ii < 10; ii++ { + expected[fmt.Sprintf("exec-%d-%d", i, ii)] = struct{}{} + } + } + + for _, s := range execSyncer.flattenedServices() { + _, ok := expected[s.Name] + if !ok { + t.Errorf("%s unexpected", s.Name) + } + delete(expected, s.Name) + } + if len(expected) > 0 { + left := []string{} + for s := range expected { + left = append(left, s) + } + sort.Strings(left) + t.Errorf("Couldn't find %d names in flattened services:\n%s", len(expected), strings.Join(left, "\n")) + } + + // All but exec-0 and possibly some of exec-99 should have been reaped + { + services, err := execSyncer.client.Agent().Services() + if err != nil { + t.Fatalf("Error getting services: %v", err) + } + expected := []int{} + for k, service := range services { + if service.Service == "consul" { + continue + } + i := -1 + ii := -1 + fmt.Sscanf(service.Service, "exec-%d-%d", &i, &ii) + switch { + case i == -1 || ii == -1: + t.Errorf("invalid service: %s -> %s", k, service.Service) + case i != 0 || ii > 9: + t.Errorf("unexpected service: %s -> %s", k, service.Service) + default: + expected = append(expected, ii) + } + } + if len(expected) != 10 { + t.Errorf("expected 0-9 but found: %#q", expected) + } + } +} diff --git a/command/agent/consul/syncer.go b/command/agent/consul/syncer.go index 13d55f533..c111f3aaf 100644 --- a/command/agent/consul/syncer.go +++ b/command/agent/consul/syncer.go @@ -35,7 +35,6 @@ import ( "time" consul "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/lib" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/structs" @@ -56,11 +55,11 @@ const ( nomadServicePrefix = "_nomad" // The periodic time interval for syncing services and checks with Consul - syncInterval = 5 * time.Second + defaultSyncInterval = 6 * time.Second - // syncJitter provides a little variance in the frequency at which + // defaultSyncJitter provides a little variance in the frequency at which // Syncer polls Consul. - syncJitter = 8 + defaultSyncJitter = time.Second // ttlCheckBuffer is the time interval that Nomad can take to report Consul // the check result @@ -144,6 +143,13 @@ type Syncer struct { periodicCallbacks map[string]types.PeriodicCallback notifySyncCh chan struct{} periodicLock sync.RWMutex + + // The periodic time interval for syncing services and checks with Consul + syncInterval time.Duration + + // syncJitter provides a little variance in the frequency at which + // Syncer polls Consul. + syncJitter time.Duration } // NewSyncer returns a new consul.Syncer @@ -168,8 +174,11 @@ func NewSyncer(consulConfig *config.ConsulConfig, shutdownCh chan struct{}, logg checkGroups: make(map[ServiceDomain]map[ServiceKey][]*consul.AgentCheckRegistration), checkRunners: make(map[consulCheckID]*CheckRunner), periodicCallbacks: make(map[string]types.PeriodicCallback), + notifySyncCh: make(chan struct{}, 1), // default noop implementation of addrFinder - addrFinder: func(string) (string, int) { return "", 0 }, + addrFinder: func(string) (string, int) { return "", 0 }, + syncInterval: defaultSyncInterval, + syncJitter: defaultSyncJitter, } return &consulSyncer, nil @@ -809,7 +818,7 @@ func (c *Syncer) Run() { for { select { case <-sync.C: - d := syncInterval - lib.RandomStagger(syncInterval/syncJitter) + d := c.syncInterval - c.syncJitter sync.Reset(d) if err := c.SyncServices(); err != nil { @@ -824,7 +833,7 @@ func (c *Syncer) Run() { c.consulAvailable = true } case <-c.notifySyncCh: - sync.Reset(syncInterval) + sync.Reset(0) case <-c.shutdownCh: c.Shutdown() case <-c.notifyShutdownCh: @@ -872,8 +881,8 @@ func (c *Syncer) SyncServices() error { // the syncer func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentService) map[consulServiceID]*consul.AgentService { localServices := make(map[consulServiceID]*consul.AgentService, len(consulServices)) - c.registryLock.RLock() - defer c.registryLock.RUnlock() + c.groupsLock.RLock() + defer c.groupsLock.RUnlock() for serviceID, service := range consulServices { for domain := range c.servicesGroups { if strings.HasPrefix(service.ID, fmt.Sprintf("%s-%s", nomadServicePrefix, domain)) { @@ -889,8 +898,8 @@ func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentSer // services with Syncer's idPrefix. func (c *Syncer) filterConsulChecks(consulChecks map[string]*consul.AgentCheck) map[consulCheckID]*consul.AgentCheck { localChecks := make(map[consulCheckID]*consul.AgentCheck, len(consulChecks)) - c.registryLock.RLock() - defer c.registryLock.RUnlock() + c.groupsLock.RLock() + defer c.groupsLock.RUnlock() for checkID, check := range consulChecks { for domain := range c.checkGroups { if strings.HasPrefix(check.ServiceID, fmt.Sprintf("%s-%s", nomadServicePrefix, domain)) { diff --git a/command/agent/consul/syncer_test.go b/command/agent/consul/syncer_test.go index 6eb52911e..42879ca77 100644 --- a/command/agent/consul/syncer_test.go +++ b/command/agent/consul/syncer_test.go @@ -1,6 +1,7 @@ package consul import ( + "io/ioutil" "log" "net" "os" @@ -9,6 +10,7 @@ import ( "time" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/testutil" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" ) @@ -21,6 +23,69 @@ const ( var logger = log.New(os.Stdout, "", log.LstdFlags) +func TestSyncNow(t *testing.T) { + cs, testconsul := testConsul(t) + defer cs.Shutdown() + defer testconsul.Stop() + + cs.SetAddrFinder(func(h string) (string, int) { + a, pstr, _ := net.SplitHostPort(h) + p, _ := net.LookupPort("tcp", pstr) + return a, p + }) + cs.syncInterval = 9000 * time.Hour + + service := &structs.Service{Name: "foo1", Tags: []string{"a", "b"}} + services := map[ServiceKey]*structs.Service{ + GenerateServiceKey(service): service, + } + + // Run syncs once on startup and then blocks forever + go cs.Run() + + if err := cs.SetServices(serviceGroupName, services); err != nil { + t.Fatalf("error setting services: %v", err) + } + + synced := false + for i := 0; !synced && i < 10; i++ { + time.Sleep(250 * time.Millisecond) + agentServices, err := cs.queryAgentServices() + if err != nil { + t.Fatalf("error querying consul services: %v", err) + } + synced = len(agentServices) == 1 + } + if !synced { + t.Fatalf("initial sync never occurred") + } + + // SetServices again should cause another sync + service1 := &structs.Service{Name: "foo1", Tags: []string{"Y", "Z"}} + service2 := &structs.Service{Name: "bar"} + services = map[ServiceKey]*structs.Service{ + GenerateServiceKey(service1): service1, + GenerateServiceKey(service2): service2, + } + + if err := cs.SetServices(serviceGroupName, services); err != nil { + t.Fatalf("error setting services: %v", err) + } + + synced = false + for i := 0; !synced && i < 10; i++ { + time.Sleep(250 * time.Millisecond) + agentServices, err := cs.queryAgentServices() + if err != nil { + t.Fatalf("error querying consul services: %v", err) + } + synced = len(agentServices) == 2 + } + if !synced { + t.Fatalf("SetServices didn't sync immediately") + } +} + func TestCheckRegistration(t *testing.T) { cs, err := NewSyncer(config.DefaultConsulConfig(), make(chan struct{}), logger) if err != nil { @@ -109,16 +174,35 @@ func TestCheckRegistration(t *testing.T) { } } -func TestConsulServiceRegisterServices(t *testing.T) { - cs, err := NewSyncer(config.DefaultConsulConfig(), nil, logger) +// testConsul returns a Syncer configured with an embedded Consul server. +// +// Callers must defer Syncer.Shutdown() and TestServer.Stop() +// +func testConsul(t *testing.T) (*Syncer, *testutil.TestServer) { + // Create an embedded Consul server + testconsul := testutil.NewTestServerConfig(t, func(c *testutil.TestServerConfig) { + // If -v wasn't specified squelch consul logging + if !testing.Verbose() { + c.Stdout = ioutil.Discard + c.Stderr = ioutil.Discard + } + }) + + // Configure Syncer to talk to the test server + cconf := config.DefaultConsulConfig() + cconf.Addr = testconsul.HTTPAddr + + cs, err := NewSyncer(cconf, nil, logger) if err != nil { - t.Fatalf("Err: %v", err) + t.Fatalf("Error creating Syncer: %v", err) } + return cs, testconsul +} + +func TestConsulServiceRegisterServices(t *testing.T) { + cs, testconsul := testConsul(t) defer cs.Shutdown() - // Skipping the test if consul isn't present - if !cs.consulPresent() { - t.Skip("skipping because consul isn't present") - } + defer testconsul.Stop() service1 := &structs.Service{Name: "foo", Tags: []string{"a", "b"}} service2 := &structs.Service{Name: "foo"} @@ -178,15 +262,10 @@ func TestConsulServiceRegisterServices(t *testing.T) { } func TestConsulServiceUpdateService(t *testing.T) { - cs, err := NewSyncer(config.DefaultConsulConfig(), nil, logger) - if err != nil { - t.Fatalf("Err: %v", err) - } + cs, testconsul := testConsul(t) defer cs.Shutdown() - // Skipping the test if consul isn't present - if !cs.consulPresent() { - t.Skip("skipping because consul isn't present") - } + defer testconsul.Stop() + cs.SetAddrFinder(func(h string) (string, int) { a, pstr, _ := net.SplitHostPort(h) p, _ := net.LookupPort("tcp", pstr)