better flush and connection closed handling

This commit is contained in:
Alex Dadgar
2016-07-22 16:02:46 -07:00
parent 67fe934342
commit 0b9449fe7e

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"io"
"math"
"net"
"net/http"
"os"
"path/filepath"
@@ -312,9 +313,9 @@ func (s *StreamFramer) run() {
var err error
defer func() {
close(s.exitCh)
close(s.outbound)
s.l.Lock()
close(s.outbound)
s.Err = err
s.running = false
s.l.Unlock()
@@ -342,7 +343,7 @@ func (s *StreamFramer) run() {
select {
case s.outbound <- s.f:
s.f = nil
default:
case <-s.exitCh:
}
s.l.Unlock()
case <-s.heartbeat.C:
@@ -369,13 +370,17 @@ OUTER:
}
// Flush any existing frames
select {
case o := <-s.outbound:
// Send the frame and then clear the current working frame
if err = s.enc.Encode(o); err != nil {
return
FLUSH:
for {
select {
case o := <-s.outbound:
// Send the frame and then clear the current working frame
if err = s.enc.Encode(o); err != nil {
return
}
default:
break FLUSH
}
default:
}
s.l.Lock()
@@ -602,10 +607,20 @@ OUTER:
if err := framer.Send(path, lastEvent, data[:n], offset); err != nil {
// Check if the connection has been closed
if err == io.ErrClosedPipe || strings.Contains(err.Error(), syscall.EPIPE.Error()) {
if err == io.ErrClosedPipe {
// The pipe check is for tests
return syscall.EPIPE
}
operr, ok := err.(*net.OpError)
if ok {
// The connection was closed by our peer
e := operr.Err.Error()
if strings.Contains(e, syscall.EPIPE.Error()) || strings.Contains(e, syscall.ECONNRESET.Error()) {
return syscall.EPIPE
}
}
return err
}
}
@@ -813,7 +828,7 @@ func (s *HTTPServer) logs(follow bool, offset int64,
}
// Check if the connection was closed
if strings.Contains(err.Error(), syscall.EPIPE.Error()) {
if err == syscall.EPIPE {
return nil
}