mirror of
https://github.com/kemko/nomad.git
synced 2026-01-04 09:25:46 +03:00
@@ -174,6 +174,8 @@ func NewSyncer(consulConfig *config.ConsulConfig, shutdownCh chan struct{}, logg
|
||||
trackedChecks: make(map[consulCheckID]*consul.AgentCheckRegistration),
|
||||
checkRunners: make(map[consulCheckID]*CheckRunner),
|
||||
periodicCallbacks: make(map[string]types.PeriodicCallback),
|
||||
// default noop implementation of addrFinder
|
||||
addrFinder: func(string) (string, int) { return "", 0 },
|
||||
}
|
||||
|
||||
return &consulSyncer, nil
|
||||
@@ -264,22 +266,47 @@ func (c *Syncer) SetServices(domain ServiceDomain, services map[ServiceKey]*stru
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// Update the services and checks groups for this domain
|
||||
c.groupsLock.Lock()
|
||||
for serviceKey, service := range registeredServices {
|
||||
serviceKeys, ok := c.servicesGroups[domain]
|
||||
if !ok {
|
||||
serviceKeys = make(map[ServiceKey]*consul.AgentServiceRegistration, len(registeredServices))
|
||||
c.servicesGroups[domain] = serviceKeys
|
||||
|
||||
// Create map for service group if it doesn't exist
|
||||
serviceKeys, ok := c.servicesGroups[domain]
|
||||
if !ok {
|
||||
serviceKeys = make(map[ServiceKey]*consul.AgentServiceRegistration, len(registeredServices))
|
||||
c.servicesGroups[domain] = serviceKeys
|
||||
}
|
||||
|
||||
// Remove stale services
|
||||
for existingServiceKey := range serviceKeys {
|
||||
if _, ok := registeredServices[existingServiceKey]; !ok {
|
||||
// Exisitng service needs to be removed
|
||||
delete(serviceKeys, existingServiceKey)
|
||||
}
|
||||
}
|
||||
|
||||
// Add registered services
|
||||
for serviceKey, service := range registeredServices {
|
||||
serviceKeys[serviceKey] = service
|
||||
}
|
||||
for serviceKey, checks := range registeredChecks {
|
||||
serviceKeys, ok := c.checkGroups[domain]
|
||||
if !ok {
|
||||
serviceKeys = make(map[ServiceKey][]*consul.AgentCheckRegistration, len(registeredChecks))
|
||||
c.checkGroups[domain] = serviceKeys
|
||||
|
||||
// Create map for check group if it doesn't exist
|
||||
checkKeys, ok := c.checkGroups[domain]
|
||||
if !ok {
|
||||
checkKeys = make(map[ServiceKey][]*consul.AgentCheckRegistration, len(registeredChecks))
|
||||
c.checkGroups[domain] = checkKeys
|
||||
}
|
||||
|
||||
// Remove stale checks
|
||||
for existingCheckKey := range checkKeys {
|
||||
if _, ok := registeredChecks[existingCheckKey]; !ok {
|
||||
// Exisitng check needs to be removed
|
||||
delete(checkKeys, existingCheckKey)
|
||||
}
|
||||
serviceKeys[serviceKey] = checks
|
||||
}
|
||||
|
||||
// Add registered checks
|
||||
for checkKey, checks := range registeredChecks {
|
||||
checkKeys[checkKey] = checks
|
||||
}
|
||||
c.groupsLock.Unlock()
|
||||
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/nomad/structs/config"
|
||||
)
|
||||
@@ -19,45 +19,7 @@ const (
|
||||
serviceGroupName = "executor"
|
||||
)
|
||||
|
||||
var (
|
||||
logger = log.New(os.Stdout, "", log.LstdFlags)
|
||||
check1 = structs.ServiceCheck{
|
||||
Name: "check-foo-1",
|
||||
Type: structs.ServiceCheckTCP,
|
||||
Interval: 30 * time.Second,
|
||||
Timeout: 5 * time.Second,
|
||||
InitialStatus: api.HealthPassing,
|
||||
}
|
||||
check2 = structs.ServiceCheck{
|
||||
Name: "check1",
|
||||
Type: "tcp",
|
||||
PortLabel: "port2",
|
||||
Interval: 3 * time.Second,
|
||||
Timeout: 1 * time.Second,
|
||||
}
|
||||
check3 = structs.ServiceCheck{
|
||||
Name: "check3",
|
||||
Type: "http",
|
||||
PortLabel: "port3",
|
||||
Path: "/health?p1=1&p2=2",
|
||||
Interval: 3 * time.Second,
|
||||
Timeout: 1 * time.Second,
|
||||
}
|
||||
service1 = structs.Service{
|
||||
Name: "foo-1",
|
||||
Tags: []string{"tag1", "tag2"},
|
||||
PortLabel: "port1",
|
||||
Checks: []*structs.ServiceCheck{
|
||||
&check1, &check2,
|
||||
},
|
||||
}
|
||||
|
||||
service2 = structs.Service{
|
||||
Name: "foo-2",
|
||||
Tags: []string{"tag1", "tag2"},
|
||||
PortLabel: "port2",
|
||||
}
|
||||
)
|
||||
var logger = log.New(os.Stdout, "", log.LstdFlags)
|
||||
|
||||
func TestCheckRegistration(t *testing.T) {
|
||||
cs, err := NewSyncer(config.DefaultConsulConfig(), make(chan struct{}), logger)
|
||||
@@ -65,9 +27,62 @@ func TestCheckRegistration(t *testing.T) {
|
||||
t.Fatalf("Err: %v", err)
|
||||
}
|
||||
|
||||
task := mockTask()
|
||||
check1 := structs.ServiceCheck{
|
||||
Name: "check-foo-1",
|
||||
Type: structs.ServiceCheckTCP,
|
||||
Interval: 30 * time.Second,
|
||||
Timeout: 5 * time.Second,
|
||||
InitialStatus: api.HealthPassing,
|
||||
}
|
||||
check2 := structs.ServiceCheck{
|
||||
Name: "check1",
|
||||
Type: "tcp",
|
||||
PortLabel: "port2",
|
||||
Interval: 3 * time.Second,
|
||||
Timeout: 1 * time.Second,
|
||||
}
|
||||
check3 := structs.ServiceCheck{
|
||||
Name: "check3",
|
||||
Type: "http",
|
||||
PortLabel: "port3",
|
||||
Path: "/health?p1=1&p2=2",
|
||||
Interval: 3 * time.Second,
|
||||
Timeout: 1 * time.Second,
|
||||
}
|
||||
service1 := structs.Service{
|
||||
Name: "foo-1",
|
||||
Tags: []string{"tag1", "tag2"},
|
||||
PortLabel: "port1",
|
||||
Checks: []*structs.ServiceCheck{
|
||||
&check1, &check2,
|
||||
},
|
||||
}
|
||||
task := structs.Task{
|
||||
Name: "foo",
|
||||
Services: []*structs.Service{&service1},
|
||||
Resources: &structs.Resources{
|
||||
Networks: []*structs.NetworkResource{
|
||||
&structs.NetworkResource{
|
||||
IP: "10.10.11.5",
|
||||
DynamicPorts: []structs.Port{
|
||||
structs.Port{
|
||||
Label: "port1",
|
||||
Value: 20002,
|
||||
},
|
||||
structs.Port{
|
||||
Label: "port2",
|
||||
Value: 20003,
|
||||
},
|
||||
structs.Port{
|
||||
Label: "port3",
|
||||
Value: 20004,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
cs.SetAddrFinder(task.FindHostAndPortFor)
|
||||
|
||||
srvReg, _ := cs.createService(&service1, "domain", "key")
|
||||
check1Reg, _ := cs.createCheckReg(&check1, srvReg)
|
||||
check2Reg, _ := cs.createCheckReg(&check2, srvReg)
|
||||
@@ -95,148 +110,136 @@ func TestCheckRegistration(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestConsulServiceRegisterServices(t *testing.T) {
|
||||
t.Skip()
|
||||
|
||||
shutdownCh := make(chan struct{})
|
||||
cs, err := NewSyncer(config.DefaultConsulConfig(), shutdownCh, logger)
|
||||
cs, err := NewSyncer(config.DefaultConsulConfig(), nil, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("Err: %v", err)
|
||||
}
|
||||
defer cs.Shutdown()
|
||||
// Skipping the test if consul isn't present
|
||||
if !cs.consulPresent() {
|
||||
return
|
||||
t.Skip("skipping because consul isn't present")
|
||||
}
|
||||
task := mockTask()
|
||||
//cs.SetServiceRegPrefix(serviceRegPrefix)
|
||||
cs.SetAddrFinder(task.FindHostAndPortFor)
|
||||
|
||||
service1 := &structs.Service{Name: "foo", Tags: []string{"a", "b"}}
|
||||
service2 := &structs.Service{Name: "foo"}
|
||||
services := map[ServiceKey]*structs.Service{
|
||||
GenerateServiceKey(service1): service1,
|
||||
GenerateServiceKey(service2): service2,
|
||||
}
|
||||
|
||||
// Call SetServices to update services in consul
|
||||
if err := cs.SetServices(serviceGroupName, services); err != nil {
|
||||
t.Fatalf("error setting services: %v", err)
|
||||
}
|
||||
|
||||
// Manually call SyncServers to cause a synchronous consul update
|
||||
if err := cs.SyncServices(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
t.Fatalf("error syncing services: %v", err)
|
||||
}
|
||||
defer cs.Shutdown()
|
||||
|
||||
// service1 := &structs.Service{Name: task.Name}
|
||||
// service2 := &structs.Service{Name: task.Name}
|
||||
//services := []*structs.Service{service1, service2}
|
||||
//service1.ServiceID = fmt.Sprintf("%s-%s:%s/%s", cs.GenerateServiceID(serviceGroupName, service1), task.Name, allocID)
|
||||
//service2.ServiceID = fmt.Sprintf("%s-%s:%s/%s", cs.GenerateServiceID(serviceGroupName, service2), task.Name, allocID)
|
||||
numservices := len(cs.flattenedServices())
|
||||
if numservices != 2 {
|
||||
t.Fatalf("expected 2 services but found %d", numservices)
|
||||
}
|
||||
|
||||
//cs.SetServices(serviceGroupName, services)
|
||||
// if err := servicesPresent(t, services, cs); err != nil {
|
||||
// t.Fatalf("err : %v", err)
|
||||
// }
|
||||
// FIXME(sean@)
|
||||
// if err := checksPresent(t, []string{check1.Hash(service1ID)}, cs); err != nil {
|
||||
// t.Fatalf("err : %v", err)
|
||||
// }
|
||||
numchecks := len(cs.flattenedChecks())
|
||||
if numchecks != 0 {
|
||||
t.Fatalf("expected 0 checks but found %d", numchecks)
|
||||
}
|
||||
|
||||
agentServices, err := cs.queryAgentServices()
|
||||
if err != nil {
|
||||
t.Fatalf("error querying consul services: %v", err)
|
||||
}
|
||||
if len(agentServices) != numservices {
|
||||
t.Fatalf("expected %d services in consul but found %d:\n%#v", numservices, len(agentServices), agentServices)
|
||||
}
|
||||
|
||||
agentChecks, err := cs.queryChecks()
|
||||
if err != nil {
|
||||
t.Fatalf("error querying consul checks: %v", err)
|
||||
}
|
||||
if len(agentChecks) != numchecks {
|
||||
t.Fatalf("expected %d checks in consul but found %d:\n%#v", numservices, len(agentChecks), agentChecks)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConsulServiceUpdateService(t *testing.T) {
|
||||
t.Skip()
|
||||
|
||||
shutdownCh := make(chan struct{})
|
||||
cs, err := NewSyncer(config.DefaultConsulConfig(), shutdownCh, logger)
|
||||
cs, err := NewSyncer(config.DefaultConsulConfig(), nil, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("Err: %v", err)
|
||||
}
|
||||
defer cs.Shutdown()
|
||||
// Skipping the test if consul isn't present
|
||||
if !cs.consulPresent() {
|
||||
return
|
||||
t.Skip("skipping because consul isn't present")
|
||||
}
|
||||
cs.SetAddrFinder(func(h string) (string, int) {
|
||||
a, pstr, _ := net.SplitHostPort(h)
|
||||
p, _ := net.LookupPort("tcp", pstr)
|
||||
return a, p
|
||||
})
|
||||
|
||||
task := mockTask()
|
||||
//cs.SetServiceRegPrefix(serviceRegPrefix)
|
||||
cs.SetAddrFinder(task.FindHostAndPortFor)
|
||||
service1 := &structs.Service{Name: "foo1", Tags: []string{"a", "b"}}
|
||||
service2 := &structs.Service{Name: "foo2"}
|
||||
services := map[ServiceKey]*structs.Service{
|
||||
GenerateServiceKey(service1): service1,
|
||||
GenerateServiceKey(service2): service2,
|
||||
}
|
||||
if err := cs.SetServices(serviceGroupName, services); err != nil {
|
||||
t.Fatalf("error setting services: %v", err)
|
||||
}
|
||||
if err := cs.SyncServices(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
t.Fatalf("error syncing services: %v", err)
|
||||
}
|
||||
defer cs.Shutdown()
|
||||
|
||||
//Update Service defn 1
|
||||
newTags := []string{"tag3"}
|
||||
task.Services[0].Tags = newTags
|
||||
// Now update both services
|
||||
service1 = &structs.Service{Name: "foo1", Tags: []string{"a", "z"}}
|
||||
service2 = &structs.Service{Name: "foo2", PortLabel: ":8899"}
|
||||
service3 := &structs.Service{Name: "foo3"}
|
||||
services = map[ServiceKey]*structs.Service{
|
||||
GenerateServiceKey(service1): service1,
|
||||
GenerateServiceKey(service2): service2,
|
||||
GenerateServiceKey(service3): service3,
|
||||
}
|
||||
if err := cs.SetServices(serviceGroupName, services); err != nil {
|
||||
t.Fatalf("error setting services: %v", err)
|
||||
}
|
||||
if err := cs.SyncServices(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
t.Fatalf("error syncing services: %v", err)
|
||||
}
|
||||
// Make sure all the services and checks are still present
|
||||
// service1 := &structs.Service{Name: task.Name}
|
||||
// service2 := &structs.Service{Name: task.Name}
|
||||
//services := []*structs.Service{service1, service2}
|
||||
//service1.ServiceID = fmt.Sprintf("%s-%s:%s/%s", cs.GenerateServiceID(serviceGroupName, service1), task.Name, allocID)
|
||||
//service2.ServiceID = fmt.Sprintf("%s-%s:%s/%s", cs.GenerateServiceID(serviceGroupName, service2), task.Name, allocID)
|
||||
// if err := servicesPresent(t, services, cs); err != nil {
|
||||
// t.Fatalf("err : %v", err)
|
||||
// }
|
||||
// FIXME(sean@)
|
||||
// if err := checksPresent(t, []string{check1.Hash(service1ID)}, cs); err != nil {
|
||||
// t.Fatalf("err : %v", err)
|
||||
// }
|
||||
|
||||
// check if service defn 1 has been updated
|
||||
// consulServices, err := cs.client.Agent().Services()
|
||||
// if err != nil {
|
||||
// t.Fatalf("errL: %v", err)
|
||||
// }
|
||||
// srv, _ := consulServices[service1.ServiceID]
|
||||
// if !reflect.DeepEqual(srv.Tags, newTags) {
|
||||
// t.Fatalf("expected tags: %v, actual: %v", newTags, srv.Tags)
|
||||
// }
|
||||
}
|
||||
|
||||
// func servicesPresent(t *testing.T, configuredServices []*structs.Service, syncer *Syncer) error {
|
||||
// var mErr multierror.Error
|
||||
// // services, err := syncer.client.Agent().Services()
|
||||
// // if err != nil {
|
||||
// // t.Fatalf("err: %v", err)
|
||||
// // }
|
||||
|
||||
// // for _, configuredService := range configuredServices {
|
||||
// // if _, ok := services[configuredService.ServiceID]; !ok {
|
||||
// // mErr.Errors = append(mErr.Errors, fmt.Errorf("service ID %q not synced", configuredService.ServiceID))
|
||||
// // }
|
||||
// // }
|
||||
// return mErr.ErrorOrNil()
|
||||
// }
|
||||
|
||||
func checksPresent(t *testing.T, checkIDs []string, syncer *Syncer) error {
|
||||
var mErr multierror.Error
|
||||
checks, err := syncer.client.Agent().Checks()
|
||||
agentServices, err := cs.queryAgentServices()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
t.Fatalf("error querying consul services: %v", err)
|
||||
}
|
||||
if len(agentServices) != 3 {
|
||||
t.Fatalf("expected 3 services in consul but found %d:\n%#v", len(agentServices), agentServices)
|
||||
}
|
||||
|
||||
for _, checkID := range checkIDs {
|
||||
if _, ok := checks[checkID]; !ok {
|
||||
mErr.Errors = append(mErr.Errors, fmt.Errorf("check ID %q not synced", checkID))
|
||||
found := 0
|
||||
for _, s := range cs.flattenedServices() {
|
||||
switch s.Name {
|
||||
case "foo1":
|
||||
found++
|
||||
if !reflect.DeepEqual(service1.Tags, s.Tags) {
|
||||
t.Errorf("incorrect tags on foo1:\n expected: %v\n found: %v", service1.Tags, s.Tags)
|
||||
}
|
||||
case "foo2":
|
||||
found++
|
||||
if s.Address != "" {
|
||||
t.Errorf("expected empty host on foo2 but found %q", s.Address)
|
||||
}
|
||||
if s.Port != 8899 {
|
||||
t.Errorf("expected port 8899 on foo2 but found %d", s.Port)
|
||||
}
|
||||
case "foo3":
|
||||
found++
|
||||
default:
|
||||
t.Errorf("unexpected service: %s", s.Name)
|
||||
}
|
||||
}
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
func mockTask() *structs.Task {
|
||||
task := structs.Task{
|
||||
Name: "foo",
|
||||
Services: []*structs.Service{&service1, &service2},
|
||||
Resources: &structs.Resources{
|
||||
Networks: []*structs.NetworkResource{
|
||||
&structs.NetworkResource{
|
||||
IP: "10.10.11.5",
|
||||
DynamicPorts: []structs.Port{
|
||||
structs.Port{
|
||||
Label: "port1",
|
||||
Value: 20002,
|
||||
},
|
||||
structs.Port{
|
||||
Label: "port2",
|
||||
Value: 20003,
|
||||
},
|
||||
structs.Port{
|
||||
Label: "port3",
|
||||
Value: 20004,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
if found != 3 {
|
||||
t.Fatalf("expected 3 services locally but found %d", found)
|
||||
}
|
||||
return &task
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user