From 49515e307c690e3c4165f2a62b022fcd5e1f6983 Mon Sep 17 00:00:00 2001 From: "nikolay.bystritskiy" Date: Thu, 13 May 2021 21:14:48 +0200 Subject: [PATCH] update health state not sorting or reordering mappers --- app/discovery/discovery.go | 114 ++++++++++++++++++++++---- app/discovery/discovery_test.go | 137 +++++++++++++++++++++++++++++++- app/discovery/health.go | 99 ----------------------- app/discovery/health_test.go | 86 -------------------- app/proxy/health.go | 25 +++++- app/proxy/proxy.go | 1 + 6 files changed, 255 insertions(+), 207 deletions(-) delete mode 100644 app/discovery/health.go delete mode 100644 app/discovery/health_test.go diff --git a/app/discovery/discovery.go b/app/discovery/discovery.go index 082953e..a21a582 100644 --- a/app/discovery/discovery.go +++ b/app/discovery/discovery.go @@ -6,6 +6,8 @@ package discovery import ( "context" + "fmt" + "net/http" "regexp" "sort" "strings" @@ -136,7 +138,10 @@ func (s *Service) Match(srv, src string) (string, MatchType, bool) { case MTProxy: dest := m.SrcMatch.ReplaceAllString(src, m.Dst) if src != dest { - return dest, m.MatchType, true + if m.IsAlive() { + return dest, m.MatchType, true + } + return dest, m.MatchType, false } case MTStatic: if src == m.AssetsWebRoot || strings.HasPrefix(src, m.AssetsWebRoot+"/") { @@ -154,27 +159,27 @@ func (s *Service) ScheduleHealthCheck(ctx context.Context, interval time.Duratio log.Printf("health-check scheduled every %s", interval) go func() { - hloop: + ticker := time.NewTicker(interval) for { - timer := time.NewTimer(interval) select { - case <-timer.C: - s.lock.RLock() - cres := CheckHealth(s.Mappers()) - s.lock.RUnlock() + case <-ticker.C: + pinged := s.CheckHealth() - // alive services would be picked up first - sort.SliceStable(cres.mappers, func(i, j int) bool { - return cres.mappers[j].dead - }) s.lock.Lock() - s.mappers = make(map[string][]URLMapper) - for _, m := range cres.mappers { - s.mappers[m.Server] = append(s.mappers[m.Server], m) + for _, mappers := range s.mappers { + for i := range mappers { + if err, ok := pinged[mappers[i].PingURL]; ok { + mappers[i].dead = false + if err != nil { + mappers[i].dead = true + } + } + } } s.lock.Unlock() case <-ctx.Done(): - break hloop + ticker.Stop() + return } } }() @@ -209,6 +214,64 @@ func (s *Service) Mappers() (mappers []URLMapper) { return mappers } +// CheckHealth starts health-check for service's mappers +func (s *Service) CheckHealth() (pingResult map[string]error) { + s.lock.RLock() + defer s.lock.RUnlock() + + const concurrent = 8 + sema := make(chan struct{}, concurrent) // limit health check to 8 concurrent calls + + // runs pings in parallel + type pingError struct { + pingURL string + err error + } + outCh := make(chan pingError, concurrent) + + services, pinged := 0, 0 + var wg sync.WaitGroup + for _, mappers := range s.mappers { + for _, m := range mappers { + if m.MatchType != MTProxy { + continue + } + services++ + if m.PingURL == "" { + continue + } + pinged++ + wg.Add(1) + + go func(m URLMapper) { + sema <- struct{}{} + defer func() { + <-sema + wg.Done() + }() + + errMsg, err := m.ping() + if err != nil { + log.Print(errMsg) + } + outCh <- pingError{m.PingURL, err} + }(m) + } + } + + go func() { + wg.Wait() + close(outCh) + }() + + pingResult = make(map[string]error) + for res := range outCh { + pingResult[res.pingURL] = res.err + } + + return pingResult +} + func (s *Service) mergeLists() (res []URLMapper) { for _, p := range s.providers { lst, err := p.List() @@ -313,3 +376,24 @@ func Contains(e string, s []string) bool { } return false } + +func (m URLMapper) IsAlive() bool { + return !m.dead +} + +func (m URLMapper) ping() (string, error) { + client := http.Client{Timeout: 500 * time.Millisecond} + + resp, err := client.Get(m.PingURL) + if err != nil { + errMsg := strings.Replace(err.Error(), "\"", "", -1) + errMsg = fmt.Sprintf("[WARN] failed to ping for health %s, %s", m.PingURL, errMsg) + return errMsg, fmt.Errorf("%s %s: %s, %v", m.Server, m.SrcMatch.String(), m.PingURL, errMsg) + } + if resp.StatusCode != http.StatusOK { + errMsg := fmt.Sprintf("[WARN] failed ping status for health %s (%s)", m.PingURL, resp.Status) + return errMsg, fmt.Errorf("%s %s: %s, %s", m.Server, m.SrcMatch.String(), m.PingURL, resp.Status) + } + + return "", err +} diff --git a/app/discovery/discovery_test.go b/app/discovery/discovery_test.go index 41005fc..fab6de3 100644 --- a/app/discovery/discovery_test.go +++ b/app/discovery/discovery_test.go @@ -86,6 +86,10 @@ func TestService_Match(t *testing.T) { {SrcMatch: *regexp.MustCompile("^/api/svc1/(.*)"), Dst: "http://127.0.0.1:8080/blah1/$1", ProviderID: PIFile}, {Server: "m.example.com", SrcMatch: *regexp.MustCompile("^/api/svc2/(.*)"), Dst: "http://127.0.0.2:8080/blah2/$1/abc", ProviderID: PIFile}, + {Server: "m.example.com", SrcMatch: *regexp.MustCompile("^/api/svc4/(.*)"), + Dst: "http://127.0.0.4:8080/blah2/$1/abc", MatchType: MTProxy, dead: true}, + {Server: "m.example.com", SrcMatch: *regexp.MustCompile("^/api/svc5/(.*)"), + Dst: "http://127.0.0.5:8080/blah2/$1/abc", MatchType: MTProxy, dead: false}, }, nil }, } @@ -111,7 +115,7 @@ func TestService_Match(t *testing.T) { err := svc.Run(ctx) require.Error(t, err) assert.Equal(t, context.DeadlineExceeded, err) - assert.Equal(t, 6, len(svc.Mappers())) + assert.Equal(t, 8, len(svc.Mappers())) tbl := []struct { server, src string @@ -126,6 +130,8 @@ func TestService_Match(t *testing.T) { {"zzz.example.com", "/aaa/api/svc1/1234", "/aaa/api/svc1/1234", MTProxy, false}, {"m.example.com", "/api/svc2/1234", "http://127.0.0.2:8080/blah2/1234/abc", MTProxy, true}, {"m1.example.com", "/api/svc2/1234", "/api/svc2/1234", MTProxy, false}, + {"m.example.com", "/api/svc4/id12345", "http://127.0.0.4:8080/blah2/id12345/abc", MTProxy, false}, + {"m.example.com", "/api/svc5/num123456", "http://127.0.0.5:8080/blah2/num123456/abc", MTProxy, true}, {"m1.example.com", "/web/index.html", "/web:/var/web/", MTStatic, true}, {"m1.example.com", "/web/", "/web:/var/web/", MTStatic, true}, {"m1.example.com", "/www/something", "/www:/var/web/", MTStatic, true}, @@ -274,6 +280,131 @@ func TestService_ScheduleHealthCheck(t *testing.T) { mappers = svc.Mappers() assert.Equal(t, false, mappers[0].dead) - assert.Equal(t, false, mappers[1].dead) - assert.Equal(t, true, mappers[2].dead) + assert.Equal(t, true, mappers[1].dead) + assert.Equal(t, false, mappers[2].dead) +} + +func Test_ping(t *testing.T) { + port := rand.Intn(10000) + 40000 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("OK")) + })) + defer ts.Close() + + ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(500) + })) + defer ts2.Close() + + type args struct { + m URLMapper + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + {name: "test server, expected OK", args: args{m: URLMapper{PingURL: ts.URL}}, want: "", wantErr: false}, + {name: "random port, expected error", args: args{m: URLMapper{PingURL: fmt.Sprintf("127.0.0.1:%d", port)}}, want: "", wantErr: true}, + {name: "error code != 200", args: args{m: URLMapper{PingURL: ts2.URL}}, want: "", wantErr: true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := tt.args.m.ping() + if (err != nil) != tt.wantErr { + t.Errorf("ping() error = %v, wantErr %v", err, tt.wantErr) + return + } + }) + } +} + +func TestCheckHealth(t *testing.T) { + port := rand.Intn(10000) + 40000 + failPingULR := fmt.Sprintf("http://127.0.0.1:%d", port) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("OK")) + })) + defer ts.Close() + + ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("OK")) + })) + defer ts2.Close() + + p1 := &ProviderMock{ + EventsFunc: func(ctx context.Context) <-chan ProviderID { + res := make(chan ProviderID, 1) + res <- PIFile + return res + }, + ListFunc: func() ([]URLMapper, error) { + return []URLMapper{ + {Server: "*", SrcMatch: *regexp.MustCompile("^/api/svc1/(.*)"), Dst: "http://127.0.0.1:8080/blah1/$1", + ProviderID: PIFile, PingURL: ts.URL}, + {Server: "*", SrcMatch: *regexp.MustCompile("^/api/svc2/(.*)"), + Dst: "http://127.0.0.2:8080/blah2/@1/abc", ProviderID: PIFile, PingURL: ts2.URL}, + }, nil + }, + } + p2 := &ProviderMock{ + EventsFunc: func(ctx context.Context) <-chan ProviderID { + return make(chan ProviderID, 1) + }, + ListFunc: func() ([]URLMapper, error) { + return []URLMapper{ + {Server: "localhost", SrcMatch: *regexp.MustCompile("/api/svc3/xyz"), + Dst: "http://127.0.0.3:8080/blah3/xyz", ProviderID: PIDocker, PingURL: failPingULR}, + }, nil + }, + } + + p3 := &ProviderMock{ + EventsFunc: func(ctx context.Context) <-chan ProviderID { + return make(chan ProviderID, 1) + }, + ListFunc: func() ([]URLMapper, error) { + return nil, errors.New("failed") + }, + } + + svc := NewService([]Provider{p1, p2, p3}, time.Millisecond*10) + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + _ = svc.Run(ctx) + mappers := svc.Mappers() + fmt.Println(mappers) + + tests := []struct { + name string + want map[string]error + }{ + {name: "case 1", + want: map[string]error{ + ts.URL: nil, + ts2.URL: nil, + failPingULR: fmt.Errorf("some error"), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := svc.CheckHealth() + for pingURL, err := range got { + wantErr, ok := tt.want[pingURL] + if !ok { + t.Errorf("CheckHealth() = ping URL %s not found in test case", pingURL) + continue + } + + if (err != nil && wantErr == nil) || + (err == nil && wantErr != nil) { + t.Errorf("CheckHealth() error = %v, wantErr %v", err, wantErr) + } + } + }) + } } diff --git a/app/discovery/health.go b/app/discovery/health.go deleted file mode 100644 index d2d5e52..0000000 --- a/app/discovery/health.go +++ /dev/null @@ -1,99 +0,0 @@ -package discovery - -import ( - "fmt" - "log" - "net/http" - "strings" - "sync" - "time" -) - -// CheckResult is result of health-check -type CheckResult struct { - Ok bool - Valid int - Total int - Errs []string - mappers []URLMapper -} - -// CheckHealth starts health-check for service's mappers -func CheckHealth(mappers []URLMapper) CheckResult { - const concurrent = 8 - sema := make(chan struct{}, concurrent) // limit health check to 8 concurrent calls - - // runs pings in parallel - type mapperError struct { - mapper URLMapper - err error - } - outCh := make(chan mapperError, concurrent) - - services, pinged := 0, 0 - var wg sync.WaitGroup - for _, m := range mappers { - if m.MatchType != MTProxy { - continue - } - services++ - if m.PingURL == "" { - continue - } - pinged++ - wg.Add(1) - - go func(m URLMapper) { - sema <- struct{}{} - defer func() { - <-sema - wg.Done() - }() - - m.dead = false - errMsg, err := ping(m) - if err != nil { - m.dead = true - log.Print(errMsg) - } - outCh <- mapperError{m, err} - }(m) - } - - go func() { - wg.Wait() - close(outCh) - }() - - res := CheckResult{} - - for m := range outCh { - if m.err != nil { - res.Errs = append(res.Errs, m.err.Error()) - } - res.mappers = append(res.mappers, m.mapper) - } - - res.Ok = len(res.Errs) == 0 - res.Valid = pinged - len(res.Errs) - res.Total = services - - return res -} - -func ping(m URLMapper) (string, error) { - client := http.Client{Timeout: 500 * time.Millisecond} - - resp, err := client.Get(m.PingURL) - if err != nil { - errMsg := strings.Replace(err.Error(), "\"", "", -1) - errMsg = fmt.Sprintf("[WARN] failed to ping for health %s, %s", m.PingURL, errMsg) - return errMsg, fmt.Errorf("%s %s: %s, %v", m.Server, m.SrcMatch.String(), m.PingURL, errMsg) - } - if resp.StatusCode != http.StatusOK { - errMsg := fmt.Sprintf("[WARN] failed ping status for health %s (%s)", m.PingURL, resp.Status) - return errMsg, fmt.Errorf("%s %s: %s, %s", m.Server, m.SrcMatch.String(), m.PingURL, resp.Status) - } - - return "", err -} diff --git a/app/discovery/health_test.go b/app/discovery/health_test.go deleted file mode 100644 index 1726b9c..0000000 --- a/app/discovery/health_test.go +++ /dev/null @@ -1,86 +0,0 @@ -package discovery - -import ( - "fmt" - "math/rand" - "net/http" - "net/http/httptest" - "testing" -) - -func Test_ping(t *testing.T) { - port := rand.Intn(10000) + 40000 - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte("OK")) - })) - defer ts.Close() - - ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(500) - })) - defer ts2.Close() - - type args struct { - m URLMapper - } - tests := []struct { - name string - args args - want string - wantErr bool - }{ - {name: "test server, expected OK", args: args{m: URLMapper{PingURL: ts.URL}}, want: "", wantErr: false}, - {name: "random port, expected error", args: args{m: URLMapper{PingURL: fmt.Sprintf("127.0.0.1:%d", port)}}, want: "", wantErr: true}, - {name: "error code != 200", args: args{m: URLMapper{PingURL: ts2.URL}}, want: "", wantErr: true}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - _, err := ping(tt.args.m) - if (err != nil) != tt.wantErr { - t.Errorf("ping() error = %v, wantErr %v", err, tt.wantErr) - return - } - }) - } -} - -func TestCheckHealth(t *testing.T) { - port := rand.Intn(10000) + 40000 - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte("OK")) - })) - defer ts.Close() - - ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte("OK")) - })) - defer ts2.Close() - - type args struct { - mappers []URLMapper - } - tests := []struct { - name string - args args - want CheckResult - }{ - {name: "case 1", args: args{mappers: []URLMapper{{PingURL: ts.URL}, {PingURL: ts2.URL}, {PingURL: fmt.Sprintf("127.0.0.1:%d", port)}}}, - want: CheckResult{Ok: false, Total: 3, Valid: 2, mappers: []URLMapper{{PingURL: ts.URL, dead: false}, {PingURL: ts2.URL, dead: false}, - {PingURL: fmt.Sprintf("127.0.0.1:%d", port), dead: true}}}}, - {name: "case 2", args: args{mappers: []URLMapper{{PingURL: ts.URL}, {PingURL: ts2.URL}}}, - want: CheckResult{Ok: true, Total: 2, Valid: 2, mappers: []URLMapper{{PingURL: ts.URL, dead: false}, {PingURL: ts2.URL, dead: false}}}}, - {name: "case 3", args: args{mappers: []URLMapper{{PingURL: ts.URL, MatchType: MTStatic}, {PingURL: ts2.URL}}}, - want: CheckResult{Ok: true, Total: 1, Valid: 1, mappers: []URLMapper{{PingURL: ts.URL, dead: false}, {PingURL: ts2.URL, dead: false}}}}, - {name: "case 4", args: args{mappers: []URLMapper{{}, {PingURL: ts2.URL}}}, - want: CheckResult{Ok: true, Total: 2, Valid: 1, mappers: []URLMapper{{PingURL: ts.URL, dead: false}, {PingURL: ts2.URL, dead: true}}}}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := CheckHealth(tt.args.mappers) - got.Errs = got.Errs[:0] - if got.Ok != tt.want.Ok || got.Total != tt.want.Total || got.Valid != tt.want.Valid { - t.Errorf("CheckHealth() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/app/proxy/health.go b/app/proxy/health.go index 67654d3..cbf292b 100644 --- a/app/proxy/health.go +++ b/app/proxy/health.go @@ -23,8 +23,25 @@ func (h *Http) healthMiddleware(next http.Handler) http.Handler { func (h *Http) healthHandler(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json; charset=UTF-8") - res := discovery.CheckHealth(h.Mappers()) - if !res.Ok { + + mappers := h.Mappers() + total := 0 + for _, m := range mappers { + if m.MatchType == discovery.MTProxy { + total++ + } + } + + pingRes := h.CheckHealth() + + var errs []string + for _, pingErr := range pingRes { + if pingErr != nil { + errs = append(errs, pingErr.Error()) + } + } + + if len(errs) > 0 { w.WriteHeader(http.StatusExpectationFailed) errResp := struct { @@ -33,13 +50,13 @@ func (h *Http) healthHandler(w http.ResponseWriter, _ *http.Request) { Passed int `json:"passed,omitempty"` Failed int `json:"failed,omitempty"` Errors []string `json:"errors,omitempty"` - }{Status: "failed", Services: res.Total, Passed: res.Valid, Failed: len(res.Errs), Errors: res.Errs} + }{Status: "failed", Services: total, Passed: len(pingRes) - len(errs), Failed: len(errs), Errors: errs} rest.RenderJSON(w, errResp) return } w.WriteHeader(http.StatusOK) - _, err := fmt.Fprintf(w, `{"status": "ok", "services": %d}`, res.Valid) + _, err := fmt.Fprintf(w, `{"status": "ok", "services": %d}`, total) if err != nil { log.Printf("[WARN] failed to send health, %v", err) } diff --git a/app/proxy/proxy.go b/app/proxy/proxy.go index 04720ab..bec4e4e 100644 --- a/app/proxy/proxy.go +++ b/app/proxy/proxy.go @@ -47,6 +47,7 @@ type Matcher interface { Match(srv, src string) (string, discovery.MatchType, bool) Servers() (servers []string) Mappers() (mappers []discovery.URLMapper) + CheckHealth() (pingResult map[string]error) } // MiddlewareProvider interface defines http middleware handler