Added a test for the sync

This commit is contained in:
Diptanu Choudhury
2016-03-23 11:31:04 -07:00
parent 617e0c3ddf
commit 92e0cd6b3f
2 changed files with 119 additions and 19 deletions

View File

@@ -40,7 +40,7 @@ const (
syncInterval = 5 * time.Second
)
func NewConsulService(config *ConsulConfig, logger *log.Logger, task *structs.Task) (*ConsulService, error) {
func NewConsulService(config *ConsulConfig, logger *log.Logger) (*ConsulService, error) {
var err error
var c *consul.Client
cfg := consul.DefaultConfig()
@@ -79,22 +79,21 @@ func NewConsulService(config *ConsulConfig, logger *log.Logger, task *structs.Ta
return nil, err
}
consulService := ConsulService{
client: c,
task: task,
logger: logger,
client: c,
logger: logger,
services: make(map[string]*consul.AgentService),
checks: make(map[string]*structs.ServiceCheck),
shutdownCh: make(chan struct{}),
}
return &consulService, nil
}
func (c *ConsulService) SyncTask(task *structs.Task) error {
var mErr multierror.Error
c.task = task
var services map[string]*consul.AgentService
var checks map[string]*structs.ServiceCheck
services := make(map[string]*consul.AgentService)
checks := make(map[string]*structs.ServiceCheck)
// Register Services and Checks that we don't know about or has changed
for _, service := range task.Services {
@@ -216,13 +215,12 @@ func (c *ConsulService) deregisterCheck(ID string) error {
return c.client.Agent().CheckDeregister(ID)
}
func (c *ConsulService) Sync() {
func (c *ConsulService) SyncWithConsul() {
sync := time.After(syncInterval)
for {
select {
case <-sync:
if err := c.performSync; err != nil {
if err := c.performSync(); err != nil {
c.logger.Printf("[DEBUG] consul: error in syncing task %q: %v", c.task.Name, err)
}
sync = time.After(syncInterval)
@@ -233,43 +231,53 @@ func (c *ConsulService) Sync() {
}
}
func (c *ConsulService) performSync() {
func (c *ConsulService) performSync() error {
var mErr multierror.Error
cServices, err := c.client.Agent().Services()
if err != nil {
return
return err
}
cServices = c.filterConsulServices(cServices)
cChecks, err := c.client.Agent().Checks()
if err != nil {
return
return err
}
cChecks = c.filterConsulChecks(cChecks)
// Remove services and checks that consul has but we don't have anymore
for serviceID, _ := range cServices {
if _, ok := c.services[serviceID]; !ok {
c.deregisterService(serviceID)
if err := c.deregisterService(serviceID); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
}
for checkID, _ := range cChecks {
if _, ok := c.checks[checkID]; !ok {
c.deregisterCheck(checkID)
if err := c.deregisterCheck(checkID); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
}
// Add services and checks that consul doesn't have but we do
for serviceID, service := range c.services {
if _, ok := cServices[serviceID]; !ok {
c.registerService(service)
if err := c.registerService(service); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
}
for checkID, check := range c.checks {
if chk, ok := cChecks[checkID]; !ok {
c.registerCheck(check, c.services[chk.ServiceID])
if err := c.registerCheck(check, c.services[chk.ServiceID]); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
}
return mErr.ErrorOrNil()
}
// filterConsulServices prunes out all the service whose ids are not prefixed

View File

@@ -0,0 +1,92 @@
package consul
import (
"log"
"os"
"testing"
"time"
"github.com/hashicorp/nomad/nomad/structs"
)
var logger = log.New(os.Stdout, "", log.LstdFlags)
func TestConsulServiceRegisterServices(t *testing.T) {
cs, err := NewConsulService(&ConsulConfig{}, logger)
if err != nil {
t.Fatalf("Err: %v", err)
}
task := structs.Task{
Name: "foo",
Services: []*structs.Service{
&structs.Service{
ID: "1",
Name: "foo-1",
Tags: []string{"tag1", "tag2"},
PortLabel: "port1",
},
&structs.Service{
ID: "2",
Name: "foo-2",
Tags: []string{"tag1", "tag2"},
PortLabel: "port2",
},
},
Resources: &structs.Resources{
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
IP: "10.10.11.5",
DynamicPorts: []structs.Port{
structs.Port{
Label: "port1",
Value: 20002,
},
structs.Port{
Label: "port2",
Value: 20003,
},
},
},
},
},
}
if err := cs.SyncTask(&task); err != nil {
t.Fatalf("err: %v", err)
}
go cs.SyncWithConsul()
time.Sleep(1 * time.Second)
services, _ := cs.client.Agent().Services()
if _, ok := services[task.Services[0].ID]; !ok {
t.Fatalf("Service with ID 1 not registered")
}
task.Services = []*structs.Service{
&structs.Service{
ID: "1",
Name: "foo-1",
Tags: []string{"tag1"},
PortLabel: "port1",
},
}
if err := cs.SyncTask(&task); err != nil {
t.Fatalf("err: %v", err)
}
services, _ = cs.client.Agent().Services()
if _, ok := services["2"]; ok {
t.Fatalf("Service with ID 2 should not be registered")
}
if err := cs.Deregister(); err != nil {
t.Fatalf("err: %v", err)
}
time.Sleep(1 * time.Second)
services, _ = cs.client.Agent().Services()
if _, ok := services["2"]; ok {
t.Fatalf("Service with ID 2 should not be registered")
}
if _, ok := services["1"]; ok {
t.Fatalf("Service with ID 1 should not be registered")
}
}