update health state not sorting or reordering mappers

This commit is contained in:
nikolay.bystritskiy
2021-05-13 21:14:48 +02:00
committed by Umputun
parent 7bbb7073ba
commit 49515e307c
6 changed files with 255 additions and 207 deletions

View File

@@ -6,6 +6,8 @@ package discovery
import (
"context"
"fmt"
"net/http"
"regexp"
"sort"
"strings"
@@ -136,7 +138,10 @@ func (s *Service) Match(srv, src string) (string, MatchType, bool) {
case MTProxy:
dest := m.SrcMatch.ReplaceAllString(src, m.Dst)
if src != dest {
return dest, m.MatchType, true
if m.IsAlive() {
return dest, m.MatchType, true
}
return dest, m.MatchType, false
}
case MTStatic:
if src == m.AssetsWebRoot || strings.HasPrefix(src, m.AssetsWebRoot+"/") {
@@ -154,27 +159,27 @@ func (s *Service) ScheduleHealthCheck(ctx context.Context, interval time.Duratio
log.Printf("health-check scheduled every %s", interval)
go func() {
hloop:
ticker := time.NewTicker(interval)
for {
timer := time.NewTimer(interval)
select {
case <-timer.C:
s.lock.RLock()
cres := CheckHealth(s.Mappers())
s.lock.RUnlock()
case <-ticker.C:
pinged := s.CheckHealth()
// alive services would be picked up first
sort.SliceStable(cres.mappers, func(i, j int) bool {
return cres.mappers[j].dead
})
s.lock.Lock()
s.mappers = make(map[string][]URLMapper)
for _, m := range cres.mappers {
s.mappers[m.Server] = append(s.mappers[m.Server], m)
for _, mappers := range s.mappers {
for i := range mappers {
if err, ok := pinged[mappers[i].PingURL]; ok {
mappers[i].dead = false
if err != nil {
mappers[i].dead = true
}
}
}
}
s.lock.Unlock()
case <-ctx.Done():
break hloop
ticker.Stop()
return
}
}
}()
@@ -209,6 +214,64 @@ func (s *Service) Mappers() (mappers []URLMapper) {
return mappers
}
// CheckHealth starts health-check for service's mappers
func (s *Service) CheckHealth() (pingResult map[string]error) {
s.lock.RLock()
defer s.lock.RUnlock()
const concurrent = 8
sema := make(chan struct{}, concurrent) // limit health check to 8 concurrent calls
// runs pings in parallel
type pingError struct {
pingURL string
err error
}
outCh := make(chan pingError, concurrent)
services, pinged := 0, 0
var wg sync.WaitGroup
for _, mappers := range s.mappers {
for _, m := range mappers {
if m.MatchType != MTProxy {
continue
}
services++
if m.PingURL == "" {
continue
}
pinged++
wg.Add(1)
go func(m URLMapper) {
sema <- struct{}{}
defer func() {
<-sema
wg.Done()
}()
errMsg, err := m.ping()
if err != nil {
log.Print(errMsg)
}
outCh <- pingError{m.PingURL, err}
}(m)
}
}
go func() {
wg.Wait()
close(outCh)
}()
pingResult = make(map[string]error)
for res := range outCh {
pingResult[res.pingURL] = res.err
}
return pingResult
}
func (s *Service) mergeLists() (res []URLMapper) {
for _, p := range s.providers {
lst, err := p.List()
@@ -313,3 +376,24 @@ func Contains(e string, s []string) bool {
}
return false
}
func (m URLMapper) IsAlive() bool {
return !m.dead
}
func (m URLMapper) ping() (string, error) {
client := http.Client{Timeout: 500 * time.Millisecond}
resp, err := client.Get(m.PingURL)
if err != nil {
errMsg := strings.Replace(err.Error(), "\"", "", -1)
errMsg = fmt.Sprintf("[WARN] failed to ping for health %s, %s", m.PingURL, errMsg)
return errMsg, fmt.Errorf("%s %s: %s, %v", m.Server, m.SrcMatch.String(), m.PingURL, errMsg)
}
if resp.StatusCode != http.StatusOK {
errMsg := fmt.Sprintf("[WARN] failed ping status for health %s (%s)", m.PingURL, resp.Status)
return errMsg, fmt.Errorf("%s %s: %s, %s", m.Server, m.SrcMatch.String(), m.PingURL, resp.Status)
}
return "", err
}

View File

@@ -86,6 +86,10 @@ func TestService_Match(t *testing.T) {
{SrcMatch: *regexp.MustCompile("^/api/svc1/(.*)"), Dst: "http://127.0.0.1:8080/blah1/$1", ProviderID: PIFile},
{Server: "m.example.com", SrcMatch: *regexp.MustCompile("^/api/svc2/(.*)"),
Dst: "http://127.0.0.2:8080/blah2/$1/abc", ProviderID: PIFile},
{Server: "m.example.com", SrcMatch: *regexp.MustCompile("^/api/svc4/(.*)"),
Dst: "http://127.0.0.4:8080/blah2/$1/abc", MatchType: MTProxy, dead: true},
{Server: "m.example.com", SrcMatch: *regexp.MustCompile("^/api/svc5/(.*)"),
Dst: "http://127.0.0.5:8080/blah2/$1/abc", MatchType: MTProxy, dead: false},
}, nil
},
}
@@ -111,7 +115,7 @@ func TestService_Match(t *testing.T) {
err := svc.Run(ctx)
require.Error(t, err)
assert.Equal(t, context.DeadlineExceeded, err)
assert.Equal(t, 6, len(svc.Mappers()))
assert.Equal(t, 8, len(svc.Mappers()))
tbl := []struct {
server, src string
@@ -126,6 +130,8 @@ func TestService_Match(t *testing.T) {
{"zzz.example.com", "/aaa/api/svc1/1234", "/aaa/api/svc1/1234", MTProxy, false},
{"m.example.com", "/api/svc2/1234", "http://127.0.0.2:8080/blah2/1234/abc", MTProxy, true},
{"m1.example.com", "/api/svc2/1234", "/api/svc2/1234", MTProxy, false},
{"m.example.com", "/api/svc4/id12345", "http://127.0.0.4:8080/blah2/id12345/abc", MTProxy, false},
{"m.example.com", "/api/svc5/num123456", "http://127.0.0.5:8080/blah2/num123456/abc", MTProxy, true},
{"m1.example.com", "/web/index.html", "/web:/var/web/", MTStatic, true},
{"m1.example.com", "/web/", "/web:/var/web/", MTStatic, true},
{"m1.example.com", "/www/something", "/www:/var/web/", MTStatic, true},
@@ -274,6 +280,131 @@ func TestService_ScheduleHealthCheck(t *testing.T) {
mappers = svc.Mappers()
assert.Equal(t, false, mappers[0].dead)
assert.Equal(t, false, mappers[1].dead)
assert.Equal(t, true, mappers[2].dead)
assert.Equal(t, true, mappers[1].dead)
assert.Equal(t, false, mappers[2].dead)
}
func Test_ping(t *testing.T) {
port := rand.Intn(10000) + 40000
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK"))
}))
defer ts.Close()
ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(500)
}))
defer ts2.Close()
type args struct {
m URLMapper
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{name: "test server, expected OK", args: args{m: URLMapper{PingURL: ts.URL}}, want: "", wantErr: false},
{name: "random port, expected error", args: args{m: URLMapper{PingURL: fmt.Sprintf("127.0.0.1:%d", port)}}, want: "", wantErr: true},
{name: "error code != 200", args: args{m: URLMapper{PingURL: ts2.URL}}, want: "", wantErr: true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := tt.args.m.ping()
if (err != nil) != tt.wantErr {
t.Errorf("ping() error = %v, wantErr %v", err, tt.wantErr)
return
}
})
}
}
func TestCheckHealth(t *testing.T) {
port := rand.Intn(10000) + 40000
failPingULR := fmt.Sprintf("http://127.0.0.1:%d", port)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK"))
}))
defer ts.Close()
ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK"))
}))
defer ts2.Close()
p1 := &ProviderMock{
EventsFunc: func(ctx context.Context) <-chan ProviderID {
res := make(chan ProviderID, 1)
res <- PIFile
return res
},
ListFunc: func() ([]URLMapper, error) {
return []URLMapper{
{Server: "*", SrcMatch: *regexp.MustCompile("^/api/svc1/(.*)"), Dst: "http://127.0.0.1:8080/blah1/$1",
ProviderID: PIFile, PingURL: ts.URL},
{Server: "*", SrcMatch: *regexp.MustCompile("^/api/svc2/(.*)"),
Dst: "http://127.0.0.2:8080/blah2/@1/abc", ProviderID: PIFile, PingURL: ts2.URL},
}, nil
},
}
p2 := &ProviderMock{
EventsFunc: func(ctx context.Context) <-chan ProviderID {
return make(chan ProviderID, 1)
},
ListFunc: func() ([]URLMapper, error) {
return []URLMapper{
{Server: "localhost", SrcMatch: *regexp.MustCompile("/api/svc3/xyz"),
Dst: "http://127.0.0.3:8080/blah3/xyz", ProviderID: PIDocker, PingURL: failPingULR},
}, nil
},
}
p3 := &ProviderMock{
EventsFunc: func(ctx context.Context) <-chan ProviderID {
return make(chan ProviderID, 1)
},
ListFunc: func() ([]URLMapper, error) {
return nil, errors.New("failed")
},
}
svc := NewService([]Provider{p1, p2, p3}, time.Millisecond*10)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
_ = svc.Run(ctx)
mappers := svc.Mappers()
fmt.Println(mappers)
tests := []struct {
name string
want map[string]error
}{
{name: "case 1",
want: map[string]error{
ts.URL: nil,
ts2.URL: nil,
failPingULR: fmt.Errorf("some error"),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := svc.CheckHealth()
for pingURL, err := range got {
wantErr, ok := tt.want[pingURL]
if !ok {
t.Errorf("CheckHealth() = ping URL %s not found in test case", pingURL)
continue
}
if (err != nil && wantErr == nil) ||
(err == nil && wantErr != nil) {
t.Errorf("CheckHealth() error = %v, wantErr %v", err, wantErr)
}
}
})
}
}

View File

@@ -1,99 +0,0 @@
package discovery
import (
"fmt"
"log"
"net/http"
"strings"
"sync"
"time"
)
// CheckResult is result of health-check
type CheckResult struct {
Ok bool
Valid int
Total int
Errs []string
mappers []URLMapper
}
// CheckHealth starts health-check for service's mappers
func CheckHealth(mappers []URLMapper) CheckResult {
const concurrent = 8
sema := make(chan struct{}, concurrent) // limit health check to 8 concurrent calls
// runs pings in parallel
type mapperError struct {
mapper URLMapper
err error
}
outCh := make(chan mapperError, concurrent)
services, pinged := 0, 0
var wg sync.WaitGroup
for _, m := range mappers {
if m.MatchType != MTProxy {
continue
}
services++
if m.PingURL == "" {
continue
}
pinged++
wg.Add(1)
go func(m URLMapper) {
sema <- struct{}{}
defer func() {
<-sema
wg.Done()
}()
m.dead = false
errMsg, err := ping(m)
if err != nil {
m.dead = true
log.Print(errMsg)
}
outCh <- mapperError{m, err}
}(m)
}
go func() {
wg.Wait()
close(outCh)
}()
res := CheckResult{}
for m := range outCh {
if m.err != nil {
res.Errs = append(res.Errs, m.err.Error())
}
res.mappers = append(res.mappers, m.mapper)
}
res.Ok = len(res.Errs) == 0
res.Valid = pinged - len(res.Errs)
res.Total = services
return res
}
func ping(m URLMapper) (string, error) {
client := http.Client{Timeout: 500 * time.Millisecond}
resp, err := client.Get(m.PingURL)
if err != nil {
errMsg := strings.Replace(err.Error(), "\"", "", -1)
errMsg = fmt.Sprintf("[WARN] failed to ping for health %s, %s", m.PingURL, errMsg)
return errMsg, fmt.Errorf("%s %s: %s, %v", m.Server, m.SrcMatch.String(), m.PingURL, errMsg)
}
if resp.StatusCode != http.StatusOK {
errMsg := fmt.Sprintf("[WARN] failed ping status for health %s (%s)", m.PingURL, resp.Status)
return errMsg, fmt.Errorf("%s %s: %s, %s", m.Server, m.SrcMatch.String(), m.PingURL, resp.Status)
}
return "", err
}

View File

@@ -1,86 +0,0 @@
package discovery
import (
"fmt"
"math/rand"
"net/http"
"net/http/httptest"
"testing"
)
func Test_ping(t *testing.T) {
port := rand.Intn(10000) + 40000
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK"))
}))
defer ts.Close()
ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(500)
}))
defer ts2.Close()
type args struct {
m URLMapper
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{name: "test server, expected OK", args: args{m: URLMapper{PingURL: ts.URL}}, want: "", wantErr: false},
{name: "random port, expected error", args: args{m: URLMapper{PingURL: fmt.Sprintf("127.0.0.1:%d", port)}}, want: "", wantErr: true},
{name: "error code != 200", args: args{m: URLMapper{PingURL: ts2.URL}}, want: "", wantErr: true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := ping(tt.args.m)
if (err != nil) != tt.wantErr {
t.Errorf("ping() error = %v, wantErr %v", err, tt.wantErr)
return
}
})
}
}
func TestCheckHealth(t *testing.T) {
port := rand.Intn(10000) + 40000
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK"))
}))
defer ts.Close()
ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK"))
}))
defer ts2.Close()
type args struct {
mappers []URLMapper
}
tests := []struct {
name string
args args
want CheckResult
}{
{name: "case 1", args: args{mappers: []URLMapper{{PingURL: ts.URL}, {PingURL: ts2.URL}, {PingURL: fmt.Sprintf("127.0.0.1:%d", port)}}},
want: CheckResult{Ok: false, Total: 3, Valid: 2, mappers: []URLMapper{{PingURL: ts.URL, dead: false}, {PingURL: ts2.URL, dead: false},
{PingURL: fmt.Sprintf("127.0.0.1:%d", port), dead: true}}}},
{name: "case 2", args: args{mappers: []URLMapper{{PingURL: ts.URL}, {PingURL: ts2.URL}}},
want: CheckResult{Ok: true, Total: 2, Valid: 2, mappers: []URLMapper{{PingURL: ts.URL, dead: false}, {PingURL: ts2.URL, dead: false}}}},
{name: "case 3", args: args{mappers: []URLMapper{{PingURL: ts.URL, MatchType: MTStatic}, {PingURL: ts2.URL}}},
want: CheckResult{Ok: true, Total: 1, Valid: 1, mappers: []URLMapper{{PingURL: ts.URL, dead: false}, {PingURL: ts2.URL, dead: false}}}},
{name: "case 4", args: args{mappers: []URLMapper{{}, {PingURL: ts2.URL}}},
want: CheckResult{Ok: true, Total: 2, Valid: 1, mappers: []URLMapper{{PingURL: ts.URL, dead: false}, {PingURL: ts2.URL, dead: true}}}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := CheckHealth(tt.args.mappers)
got.Errs = got.Errs[:0]
if got.Ok != tt.want.Ok || got.Total != tt.want.Total || got.Valid != tt.want.Valid {
t.Errorf("CheckHealth() = %v, want %v", got, tt.want)
}
})
}
}

View File

@@ -23,8 +23,25 @@ func (h *Http) healthMiddleware(next http.Handler) http.Handler {
func (h *Http) healthHandler(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
res := discovery.CheckHealth(h.Mappers())
if !res.Ok {
mappers := h.Mappers()
total := 0
for _, m := range mappers {
if m.MatchType == discovery.MTProxy {
total++
}
}
pingRes := h.CheckHealth()
var errs []string
for _, pingErr := range pingRes {
if pingErr != nil {
errs = append(errs, pingErr.Error())
}
}
if len(errs) > 0 {
w.WriteHeader(http.StatusExpectationFailed)
errResp := struct {
@@ -33,13 +50,13 @@ func (h *Http) healthHandler(w http.ResponseWriter, _ *http.Request) {
Passed int `json:"passed,omitempty"`
Failed int `json:"failed,omitempty"`
Errors []string `json:"errors,omitempty"`
}{Status: "failed", Services: res.Total, Passed: res.Valid, Failed: len(res.Errs), Errors: res.Errs}
}{Status: "failed", Services: total, Passed: len(pingRes) - len(errs), Failed: len(errs), Errors: errs}
rest.RenderJSON(w, errResp)
return
}
w.WriteHeader(http.StatusOK)
_, err := fmt.Fprintf(w, `{"status": "ok", "services": %d}`, res.Valid)
_, err := fmt.Fprintf(w, `{"status": "ok", "services": %d}`, total)
if err != nil {
log.Printf("[WARN] failed to send health, %v", err)
}

View File

@@ -47,6 +47,7 @@ type Matcher interface {
Match(srv, src string) (string, discovery.MatchType, bool)
Servers() (servers []string)
Mappers() (mappers []discovery.URLMapper)
CheckHealth() (pingResult map[string]error)
}
// MiddlewareProvider interface defines http middleware handler