diff --git a/Makefile b/Makefile index e8ba29a..2174676 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ GITREV=$(shell git describe --abbrev=7 --always --tags) REV=$(GITREV)-$(BRANCH)-$(shell date +%Y%m%d-%H:%M:%S) docker: - docker build -t umputun/reproxy . + docker build -t umputun/reproxy:master . dist: - @mkdir -p dist diff --git a/app/discovery/discovery.go b/app/discovery/discovery.go index 0ed64da..9b27578 100644 --- a/app/discovery/discovery.go +++ b/app/discovery/discovery.go @@ -10,6 +10,7 @@ import ( "sort" "strings" "sync" + "time" log "github.com/go-pkgz/lgr" ) @@ -21,6 +22,7 @@ type Service struct { providers []Provider mappers map[string][]URLMapper lock sync.RWMutex + interval time.Duration } // URLMapper contains all info about source and destination routes @@ -34,7 +36,7 @@ type URLMapper struct { // Provider defines sources of mappers type Provider interface { - Events(ctx context.Context) (res <-chan struct{}) + Events(ctx context.Context) (res <-chan ProviderID) List() (res []URLMapper, err error) } @@ -50,27 +52,34 @@ const ( // NewService makes service with given providers func NewService(providers []Provider) *Service { - return &Service{providers: providers} + return &Service{providers: providers, interval: time.Second} } // Run runs blocking loop getting events from all providers // and updating all mappers on each event func (s *Service) Run(ctx context.Context) error { - evChs := make([]<-chan struct{}, 0, len(s.providers)) + evChs := make([]<-chan ProviderID, 0, len(s.providers)) for _, p := range s.providers { evChs = append(evChs, p.Events(ctx)) } ch := s.mergeEvents(ctx, evChs...) + var evRecv bool for { select { case <-ctx.Done(): return ctx.Err() - case <-ch: - log.Printf("[DEBUG] new update event received") + case ev := <-ch: + log.Printf("[DEBUG] new update event received, %s", ev) + evRecv = true + case <-time.After(s.interval): + if !evRecv { + continue + } + evRecv = false lst := s.mergeLists() for _, m := range lst { - log.Printf("[INFO] match for %s: %s %s %s", m.ProviderID, m.Server, m.SrcMatch.String(), m.Dst) + log.Printf("[INFO] match for %s: %s %s -> %s", m.ProviderID, m.Server, m.SrcMatch.String(), m.Dst) } s.lock.Lock() s.mappers = make(map[string][]URLMapper) @@ -169,11 +178,11 @@ func (s *Service) extendMapper(m URLMapper) URLMapper { return res } -func (s *Service) mergeEvents(ctx context.Context, chs ...<-chan struct{}) <-chan struct{} { +func (s *Service) mergeEvents(ctx context.Context, chs ...<-chan ProviderID) <-chan ProviderID { var wg sync.WaitGroup - out := make(chan struct{}) + out := make(chan ProviderID) - output := func(ctx context.Context, c <-chan struct{}) { + output := func(ctx context.Context, c <-chan ProviderID) { defer wg.Done() for { select { diff --git a/app/discovery/discovery_test.go b/app/discovery/discovery_test.go index 1a73af4..47d6bf0 100644 --- a/app/discovery/discovery_test.go +++ b/app/discovery/discovery_test.go @@ -13,9 +13,9 @@ import ( func TestService_Do(t *testing.T) { p1 := &ProviderMock{ - EventsFunc: func(ctx context.Context) <-chan struct{} { - res := make(chan struct{}, 1) - res <- struct{}{} + EventsFunc: func(ctx context.Context) <-chan ProviderID { + res := make(chan ProviderID, 1) + res <- PIFile return res }, ListFunc: func() ([]URLMapper, error) { @@ -28,8 +28,8 @@ func TestService_Do(t *testing.T) { }, } p2 := &ProviderMock{ - EventsFunc: func(ctx context.Context) <-chan struct{} { - return make(chan struct{}, 1) + EventsFunc: func(ctx context.Context) <-chan ProviderID { + return make(chan ProviderID, 1) }, ListFunc: func() ([]URLMapper, error) { return []URLMapper{ @@ -39,7 +39,7 @@ func TestService_Do(t *testing.T) { }, } svc := NewService([]Provider{p1, p2}) - + svc.interval = time.Millisecond * 100 ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() @@ -62,9 +62,9 @@ func TestService_Do(t *testing.T) { func TestService_Match(t *testing.T) { p1 := &ProviderMock{ - EventsFunc: func(ctx context.Context) <-chan struct{} { - res := make(chan struct{}, 1) - res <- struct{}{} + EventsFunc: func(ctx context.Context) <-chan ProviderID { + res := make(chan ProviderID, 1) + res <- PIFile return res }, ListFunc: func() ([]URLMapper, error) { @@ -76,8 +76,8 @@ func TestService_Match(t *testing.T) { }, } p2 := &ProviderMock{ - EventsFunc: func(ctx context.Context) <-chan struct{} { - return make(chan struct{}, 1) + EventsFunc: func(ctx context.Context) <-chan ProviderID { + return make(chan ProviderID, 1) }, ListFunc: func() ([]URLMapper, error) { return []URLMapper{ @@ -86,7 +86,7 @@ func TestService_Match(t *testing.T) { }, } svc := NewService([]Provider{p1, p2}) - + svc.interval = time.Millisecond * 100 ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() @@ -120,9 +120,9 @@ func TestService_Match(t *testing.T) { func TestService_Servers(t *testing.T) { p1 := &ProviderMock{ - EventsFunc: func(ctx context.Context) <-chan struct{} { - res := make(chan struct{}, 1) - res <- struct{}{} + EventsFunc: func(ctx context.Context) <-chan ProviderID { + res := make(chan ProviderID, 1) + res <- PIFile return res }, ListFunc: func() ([]URLMapper, error) { @@ -134,8 +134,8 @@ func TestService_Servers(t *testing.T) { }, } p2 := &ProviderMock{ - EventsFunc: func(ctx context.Context) <-chan struct{} { - return make(chan struct{}, 1) + EventsFunc: func(ctx context.Context) <-chan ProviderID { + return make(chan ProviderID, 1) }, ListFunc: func() ([]URLMapper, error) { return []URLMapper{ @@ -148,6 +148,7 @@ func TestService_Servers(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() svc := NewService([]Provider{p1, p2}) + svc.interval = time.Millisecond * 100 err := svc.Run(ctx) require.Error(t, err) assert.Equal(t, context.DeadlineExceeded, err) diff --git a/app/discovery/provider/docker.go b/app/discovery/provider/docker.go index 36980b7..711d025 100644 --- a/app/discovery/provider/docker.go +++ b/app/discovery/provider/docker.go @@ -45,8 +45,8 @@ type containerInfo struct { } // Events gets eventsCh with all containers-related docker events events -func (d *Docker) Events(ctx context.Context) (res <-chan struct{}) { - eventsCh := make(chan struct{}) +func (d *Docker) Events(ctx context.Context) (res <-chan discovery.ProviderID) { + eventsCh := make(chan discovery.ProviderID) go func() { defer close(eventsCh) // loop over to recover from failed events call @@ -103,7 +103,7 @@ func (d *Docker) List() ([]discovery.URLMapper, error) { // activate starts blocking listener for all docker events // filters everything except "container" type, detects stop/start events and publishes signals to eventsCh -func (d *Docker) events(ctx context.Context, client DockerClient, eventsCh chan struct{}) error { +func (d *Docker) events(ctx context.Context, client DockerClient, eventsCh chan discovery.ProviderID) error { dockerEventsCh := make(chan *dc.APIEvents) err := client.AddEventListenerWithOptions(dc.EventsOptions{ Filters: map[string][]string{"type": {"container"}, "event": {"start", "die", "destroy", "restart", "pause"}}}, @@ -112,7 +112,7 @@ func (d *Docker) events(ctx context.Context, client DockerClient, eventsCh chan return errors.Wrap(err, "can't add even listener") } - eventsCh <- struct{}{} // initial emmit + eventsCh <- discovery.PIDocker // initial emmit for { select { case <-ctx.Done(): @@ -128,8 +128,8 @@ func (d *Docker) events(ctx context.Context, client DockerClient, eventsCh chan log.Printf("[DEBUG] container %s excluded", containerName) continue } - log.Printf("[INFO] new event %+v", ev) - eventsCh <- struct{}{} + log.Printf("[INFO] new docker event: container %s, status %s", containerName, ev.Status) + eventsCh <- discovery.PIDocker } } } diff --git a/app/discovery/provider/file.go b/app/discovery/provider/file.go index 09d422a..3bf20ff 100644 --- a/app/discovery/provider/file.go +++ b/app/discovery/provider/file.go @@ -22,13 +22,13 @@ type File struct { } // Events returns channel updating on file change only -func (d *File) Events(ctx context.Context) <-chan struct{} { - res := make(chan struct{}) +func (d *File) Events(ctx context.Context) <-chan discovery.ProviderID { + res := make(chan discovery.ProviderID) // no need to queue multiple events - trySubmit := func(ch chan struct{}) bool { + trySubmit := func(ch chan discovery.ProviderID) bool { select { - case ch <- struct{}{}: + case ch <- discovery.PIFile: return true default: return false diff --git a/app/discovery/provider/static.go b/app/discovery/provider/static.go index 9fab687..ba8a90e 100644 --- a/app/discovery/provider/static.go +++ b/app/discovery/provider/static.go @@ -2,6 +2,7 @@ package provider import ( "context" + "log" "regexp" "strings" @@ -16,9 +17,9 @@ type Static struct { } // Events returns channel updating once -func (s *Static) Events(_ context.Context) <-chan struct{} { - res := make(chan struct{}, 1) - res <- struct{}{} +func (s *Static) Events(_ context.Context) <-chan discovery.ProviderID { + res := make(chan discovery.ProviderID, 1) + res <- discovery.PIStatic return res } @@ -45,8 +46,12 @@ func (s *Static) List() (res []discovery.URLMapper, err error) { } for _, r := range s.Rules { + if strings.TrimSpace(r) == "" { + continue + } um, err := parse(r) if err != nil { + log.Printf("[DEBUG] invalid rule %s, %v", r, err) return nil, err } res = append(res, um) diff --git a/app/discovery/provider_mock.go b/app/discovery/provider_mock.go index 05b8ab1..a4a88ee 100644 --- a/app/discovery/provider_mock.go +++ b/app/discovery/provider_mock.go @@ -18,7 +18,7 @@ var _ Provider = &ProviderMock{} // // // make and configure a mocked Provider // mockedProvider := &ProviderMock{ -// EventsFunc: func(ctx context.Context) <-chan struct{} { +// EventsFunc: func(ctx context.Context) <-chan ProviderID { // panic("mock out the Events method") // }, // ListFunc: func() ([]URLMapper, error) { @@ -32,7 +32,7 @@ var _ Provider = &ProviderMock{} // } type ProviderMock struct { // EventsFunc mocks the Events method. - EventsFunc func(ctx context.Context) <-chan struct{} + EventsFunc func(ctx context.Context) <-chan ProviderID // ListFunc mocks the List method. ListFunc func() ([]URLMapper, error) @@ -53,7 +53,7 @@ type ProviderMock struct { } // Events calls EventsFunc. -func (mock *ProviderMock) Events(ctx context.Context) <-chan struct{} { +func (mock *ProviderMock) Events(ctx context.Context) <-chan ProviderID { if mock.EventsFunc == nil { panic("ProviderMock.EventsFunc: method is nil but Provider.Events was just called") }