switch events to provider id, delay repsponse

This commit is contained in:
Umputun
2021-04-12 02:09:34 -05:00
parent a79cd3c7a4
commit 8b3889d4e5
7 changed files with 58 additions and 43 deletions

View File

@@ -4,7 +4,7 @@ GITREV=$(shell git describe --abbrev=7 --always --tags)
REV=$(GITREV)-$(BRANCH)-$(shell date +%Y%m%d-%H:%M:%S)
docker:
docker build -t umputun/reproxy .
docker build -t umputun/reproxy:master .
dist:
- @mkdir -p dist

View File

@@ -10,6 +10,7 @@ import (
"sort"
"strings"
"sync"
"time"
log "github.com/go-pkgz/lgr"
)
@@ -21,6 +22,7 @@ type Service struct {
providers []Provider
mappers map[string][]URLMapper
lock sync.RWMutex
interval time.Duration
}
// URLMapper contains all info about source and destination routes
@@ -34,7 +36,7 @@ type URLMapper struct {
// Provider defines sources of mappers
type Provider interface {
Events(ctx context.Context) (res <-chan struct{})
Events(ctx context.Context) (res <-chan ProviderID)
List() (res []URLMapper, err error)
}
@@ -50,27 +52,34 @@ const (
// NewService makes service with given providers
func NewService(providers []Provider) *Service {
return &Service{providers: providers}
return &Service{providers: providers, interval: time.Second}
}
// Run runs blocking loop getting events from all providers
// and updating all mappers on each event
func (s *Service) Run(ctx context.Context) error {
evChs := make([]<-chan struct{}, 0, len(s.providers))
evChs := make([]<-chan ProviderID, 0, len(s.providers))
for _, p := range s.providers {
evChs = append(evChs, p.Events(ctx))
}
ch := s.mergeEvents(ctx, evChs...)
var evRecv bool
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ch:
log.Printf("[DEBUG] new update event received")
case ev := <-ch:
log.Printf("[DEBUG] new update event received, %s", ev)
evRecv = true
case <-time.After(s.interval):
if !evRecv {
continue
}
evRecv = false
lst := s.mergeLists()
for _, m := range lst {
log.Printf("[INFO] match for %s: %s %s %s", m.ProviderID, m.Server, m.SrcMatch.String(), m.Dst)
log.Printf("[INFO] match for %s: %s %s -> %s", m.ProviderID, m.Server, m.SrcMatch.String(), m.Dst)
}
s.lock.Lock()
s.mappers = make(map[string][]URLMapper)
@@ -169,11 +178,11 @@ func (s *Service) extendMapper(m URLMapper) URLMapper {
return res
}
func (s *Service) mergeEvents(ctx context.Context, chs ...<-chan struct{}) <-chan struct{} {
func (s *Service) mergeEvents(ctx context.Context, chs ...<-chan ProviderID) <-chan ProviderID {
var wg sync.WaitGroup
out := make(chan struct{})
out := make(chan ProviderID)
output := func(ctx context.Context, c <-chan struct{}) {
output := func(ctx context.Context, c <-chan ProviderID) {
defer wg.Done()
for {
select {

View File

@@ -13,9 +13,9 @@ import (
func TestService_Do(t *testing.T) {
p1 := &ProviderMock{
EventsFunc: func(ctx context.Context) <-chan struct{} {
res := make(chan struct{}, 1)
res <- struct{}{}
EventsFunc: func(ctx context.Context) <-chan ProviderID {
res := make(chan ProviderID, 1)
res <- PIFile
return res
},
ListFunc: func() ([]URLMapper, error) {
@@ -28,8 +28,8 @@ func TestService_Do(t *testing.T) {
},
}
p2 := &ProviderMock{
EventsFunc: func(ctx context.Context) <-chan struct{} {
return make(chan struct{}, 1)
EventsFunc: func(ctx context.Context) <-chan ProviderID {
return make(chan ProviderID, 1)
},
ListFunc: func() ([]URLMapper, error) {
return []URLMapper{
@@ -39,7 +39,7 @@ func TestService_Do(t *testing.T) {
},
}
svc := NewService([]Provider{p1, p2})
svc.interval = time.Millisecond * 100
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
@@ -62,9 +62,9 @@ func TestService_Do(t *testing.T) {
func TestService_Match(t *testing.T) {
p1 := &ProviderMock{
EventsFunc: func(ctx context.Context) <-chan struct{} {
res := make(chan struct{}, 1)
res <- struct{}{}
EventsFunc: func(ctx context.Context) <-chan ProviderID {
res := make(chan ProviderID, 1)
res <- PIFile
return res
},
ListFunc: func() ([]URLMapper, error) {
@@ -76,8 +76,8 @@ func TestService_Match(t *testing.T) {
},
}
p2 := &ProviderMock{
EventsFunc: func(ctx context.Context) <-chan struct{} {
return make(chan struct{}, 1)
EventsFunc: func(ctx context.Context) <-chan ProviderID {
return make(chan ProviderID, 1)
},
ListFunc: func() ([]URLMapper, error) {
return []URLMapper{
@@ -86,7 +86,7 @@ func TestService_Match(t *testing.T) {
},
}
svc := NewService([]Provider{p1, p2})
svc.interval = time.Millisecond * 100
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
@@ -120,9 +120,9 @@ func TestService_Match(t *testing.T) {
func TestService_Servers(t *testing.T) {
p1 := &ProviderMock{
EventsFunc: func(ctx context.Context) <-chan struct{} {
res := make(chan struct{}, 1)
res <- struct{}{}
EventsFunc: func(ctx context.Context) <-chan ProviderID {
res := make(chan ProviderID, 1)
res <- PIFile
return res
},
ListFunc: func() ([]URLMapper, error) {
@@ -134,8 +134,8 @@ func TestService_Servers(t *testing.T) {
},
}
p2 := &ProviderMock{
EventsFunc: func(ctx context.Context) <-chan struct{} {
return make(chan struct{}, 1)
EventsFunc: func(ctx context.Context) <-chan ProviderID {
return make(chan ProviderID, 1)
},
ListFunc: func() ([]URLMapper, error) {
return []URLMapper{
@@ -148,6 +148,7 @@ func TestService_Servers(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
svc := NewService([]Provider{p1, p2})
svc.interval = time.Millisecond * 100
err := svc.Run(ctx)
require.Error(t, err)
assert.Equal(t, context.DeadlineExceeded, err)

View File

@@ -45,8 +45,8 @@ type containerInfo struct {
}
// Events gets eventsCh with all containers-related docker events events
func (d *Docker) Events(ctx context.Context) (res <-chan struct{}) {
eventsCh := make(chan struct{})
func (d *Docker) Events(ctx context.Context) (res <-chan discovery.ProviderID) {
eventsCh := make(chan discovery.ProviderID)
go func() {
defer close(eventsCh)
// loop over to recover from failed events call
@@ -103,7 +103,7 @@ func (d *Docker) List() ([]discovery.URLMapper, error) {
// activate starts blocking listener for all docker events
// filters everything except "container" type, detects stop/start events and publishes signals to eventsCh
func (d *Docker) events(ctx context.Context, client DockerClient, eventsCh chan struct{}) error {
func (d *Docker) events(ctx context.Context, client DockerClient, eventsCh chan discovery.ProviderID) error {
dockerEventsCh := make(chan *dc.APIEvents)
err := client.AddEventListenerWithOptions(dc.EventsOptions{
Filters: map[string][]string{"type": {"container"}, "event": {"start", "die", "destroy", "restart", "pause"}}},
@@ -112,7 +112,7 @@ func (d *Docker) events(ctx context.Context, client DockerClient, eventsCh chan
return errors.Wrap(err, "can't add even listener")
}
eventsCh <- struct{}{} // initial emmit
eventsCh <- discovery.PIDocker // initial emmit
for {
select {
case <-ctx.Done():
@@ -128,8 +128,8 @@ func (d *Docker) events(ctx context.Context, client DockerClient, eventsCh chan
log.Printf("[DEBUG] container %s excluded", containerName)
continue
}
log.Printf("[INFO] new event %+v", ev)
eventsCh <- struct{}{}
log.Printf("[INFO] new docker event: container %s, status %s", containerName, ev.Status)
eventsCh <- discovery.PIDocker
}
}
}

View File

@@ -22,13 +22,13 @@ type File struct {
}
// Events returns channel updating on file change only
func (d *File) Events(ctx context.Context) <-chan struct{} {
res := make(chan struct{})
func (d *File) Events(ctx context.Context) <-chan discovery.ProviderID {
res := make(chan discovery.ProviderID)
// no need to queue multiple events
trySubmit := func(ch chan struct{}) bool {
trySubmit := func(ch chan discovery.ProviderID) bool {
select {
case ch <- struct{}{}:
case ch <- discovery.PIFile:
return true
default:
return false

View File

@@ -2,6 +2,7 @@ package provider
import (
"context"
"log"
"regexp"
"strings"
@@ -16,9 +17,9 @@ type Static struct {
}
// Events returns channel updating once
func (s *Static) Events(_ context.Context) <-chan struct{} {
res := make(chan struct{}, 1)
res <- struct{}{}
func (s *Static) Events(_ context.Context) <-chan discovery.ProviderID {
res := make(chan discovery.ProviderID, 1)
res <- discovery.PIStatic
return res
}
@@ -45,8 +46,12 @@ func (s *Static) List() (res []discovery.URLMapper, err error) {
}
for _, r := range s.Rules {
if strings.TrimSpace(r) == "" {
continue
}
um, err := parse(r)
if err != nil {
log.Printf("[DEBUG] invalid rule %s, %v", r, err)
return nil, err
}
res = append(res, um)

View File

@@ -18,7 +18,7 @@ var _ Provider = &ProviderMock{}
//
// // make and configure a mocked Provider
// mockedProvider := &ProviderMock{
// EventsFunc: func(ctx context.Context) <-chan struct{} {
// EventsFunc: func(ctx context.Context) <-chan ProviderID {
// panic("mock out the Events method")
// },
// ListFunc: func() ([]URLMapper, error) {
@@ -32,7 +32,7 @@ var _ Provider = &ProviderMock{}
// }
type ProviderMock struct {
// EventsFunc mocks the Events method.
EventsFunc func(ctx context.Context) <-chan struct{}
EventsFunc func(ctx context.Context) <-chan ProviderID
// ListFunc mocks the List method.
ListFunc func() ([]URLMapper, error)
@@ -53,7 +53,7 @@ type ProviderMock struct {
}
// Events calls EventsFunc.
func (mock *ProviderMock) Events(ctx context.Context) <-chan struct{} {
func (mock *ProviderMock) Events(ctx context.Context) <-chan ProviderID {
if mock.EventsFunc == nil {
panic("ProviderMock.EventsFunc: method is nil but Provider.Events was just called")
}