mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 09:55:44 +03:00
Fix race in framer and improperly returned err
Fixes #3342 Two bugs were fixed: * Closing the StreamFramer's exitCh before setting the error means other goroutines blocked on exitCh closing could see the error as nil. This was *not* observered. * parseFramerError on Windows would fall through and return an improperly captured nil err variable. There's no need for parseFramerError to be a closure which fixes the confusion.
This commit is contained in:
@@ -316,7 +316,7 @@ type StreamFramer struct {
|
||||
// Captures whether the framer is running and any error that occurred to
|
||||
// cause it to stop.
|
||||
running bool
|
||||
Err error
|
||||
err error
|
||||
}
|
||||
|
||||
// NewStreamFramer creates a new stream framer that will output StreamFrames to
|
||||
@@ -379,16 +379,23 @@ func (s *StreamFramer) ExitCh() <-chan struct{} {
|
||||
return s.exitCh
|
||||
}
|
||||
|
||||
// Err returns the error that caused the StreamFramer to exit
|
||||
func (s *StreamFramer) Err() error {
|
||||
s.l.Lock()
|
||||
defer s.l.Unlock()
|
||||
return s.err
|
||||
}
|
||||
|
||||
// run is the internal run method. It exits if Destroy is called or an error
|
||||
// occurs, in which case the exit channel is closed.
|
||||
func (s *StreamFramer) run() {
|
||||
var err error
|
||||
defer func() {
|
||||
close(s.exitCh)
|
||||
s.l.Lock()
|
||||
s.running = false
|
||||
s.Err = err
|
||||
s.err = err
|
||||
s.l.Unlock()
|
||||
close(s.exitCh)
|
||||
}()
|
||||
|
||||
OUTER:
|
||||
@@ -466,8 +473,8 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
|
||||
// If we are not running, return the error that caused us to not run or
|
||||
// indicated that it was never started.
|
||||
if !s.running {
|
||||
if s.Err != nil {
|
||||
return s.Err
|
||||
if s.err != nil {
|
||||
return s.err
|
||||
}
|
||||
|
||||
return fmt.Errorf("StreamFramer not running")
|
||||
@@ -608,6 +615,26 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// parseFramerErr takes an error and returns an error. The error will
|
||||
// potentially change if it was caused by the connection being closed.
|
||||
func parseFramerErr(err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if strings.Contains(err.Error(), io.ErrClosedPipe.Error()) {
|
||||
// The pipe check is for tests
|
||||
return syscall.EPIPE
|
||||
}
|
||||
|
||||
// The connection was closed by our peer
|
||||
if strings.Contains(err.Error(), syscall.EPIPE.Error()) || strings.Contains(err.Error(), syscall.ECONNRESET.Error()) {
|
||||
return syscall.EPIPE
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// stream is the internal method to stream the content of a file. eofCancelCh is
|
||||
// used to cancel the stream if triggered while at EOF. If the connection is
|
||||
// broken an EPIPE error is returned
|
||||
@@ -629,26 +656,6 @@ func (s *HTTPServer) stream(offset int64, path string,
|
||||
t.Done()
|
||||
}()
|
||||
|
||||
// parseFramerErr takes an error and returns an error. The error will
|
||||
// potentially change if it was caused by the connection being closed.
|
||||
parseFramerErr := func(e error) error {
|
||||
if e == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if strings.Contains(e.Error(), io.ErrClosedPipe.Error()) {
|
||||
// The pipe check is for tests
|
||||
return syscall.EPIPE
|
||||
}
|
||||
|
||||
// The connection was closed by our peer
|
||||
if strings.Contains(e.Error(), syscall.EPIPE.Error()) || strings.Contains(e.Error(), syscall.ECONNRESET.Error()) {
|
||||
return syscall.EPIPE
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Create a variable to allow setting the last event
|
||||
var lastEvent string
|
||||
|
||||
@@ -721,7 +728,7 @@ OUTER:
|
||||
lastEvent = truncateEvent
|
||||
continue OUTER
|
||||
case <-framer.ExitCh():
|
||||
return parseFramerErr(framer.Err)
|
||||
return parseFramerErr(framer.Err())
|
||||
case err, ok := <-eofCancelCh:
|
||||
if !ok {
|
||||
return nil
|
||||
@@ -916,7 +923,7 @@ func (s *HTTPServer) logs(follow, plain bool, offset int64,
|
||||
return nil
|
||||
}
|
||||
|
||||
//Since we successfully streamed, update the overall offset/idx.
|
||||
// Since we successfully streamed, update the overall offset/idx.
|
||||
offset = int64(0)
|
||||
nextIdx = idx + 1
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user