diff --git a/vendor/github.com/hashicorp/yamux/.gitignore b/vendor/github.com/hashicorp/yamux/.gitignore deleted file mode 100644 index 836562412..000000000 --- a/vendor/github.com/hashicorp/yamux/.gitignore +++ /dev/null @@ -1,23 +0,0 @@ -# Compiled Object files, Static and Dynamic libs (Shared Objects) -*.o -*.a -*.so - -# Folders -_obj -_test - -# Architecture specific extensions/prefixes -*.[568vq] -[568vq].out - -*.cgo1.go -*.cgo2.c -_cgo_defun.c -_cgo_gotypes.go -_cgo_export.* - -_testmain.go - -*.exe -*.test diff --git a/vendor/github.com/hashicorp/yamux/session.go b/vendor/github.com/hashicorp/yamux/session.go index 20369bdd9..e17981839 100644 --- a/vendor/github.com/hashicorp/yamux/session.go +++ b/vendor/github.com/hashicorp/yamux/session.go @@ -46,8 +46,11 @@ type Session struct { pingID uint32 pingLock sync.Mutex - // streams maps a stream id to a stream + // streams maps a stream id to a stream, and inflight has an entry + // for any outgoing stream that has not yet been established. Both are + // protected by streamLock. streams map[uint32]*Stream + inflight map[uint32]struct{} streamLock sync.Mutex // synCh acts like a semaphore. It is sized to the AcceptBacklog which @@ -90,6 +93,7 @@ func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session { bufRead: bufio.NewReader(conn), pings: make(map[uint32]chan struct{}), streams: make(map[uint32]*Stream), + inflight: make(map[uint32]struct{}), synCh: make(chan struct{}, config.AcceptBacklog), acceptCh: make(chan *Stream, config.AcceptBacklog), sendCh: make(chan sendReady, 64), @@ -153,7 +157,7 @@ func (s *Session) OpenStream() (*Stream, error) { } GET_ID: - // Get and ID, and check for stream exhaustion + // Get an ID, and check for stream exhaustion id := atomic.LoadUint32(&s.nextStreamID) if id >= math.MaxUint32-1 { return nil, ErrStreamsExhausted @@ -166,10 +170,16 @@ GET_ID: stream := newStream(s, id, streamInit) s.streamLock.Lock() s.streams[id] = stream + s.inflight[id] = struct{}{} s.streamLock.Unlock() // Send the window update to create if err := stream.sendWindowUpdate(); err != nil { + select { + case <-s.synCh: + default: + s.logger.Printf("[ERR] yamux: aborted stream open without inflight syn semaphore") + } return nil, err } return stream, nil @@ -580,19 +590,34 @@ func (s *Session) incomingStream(id uint32) error { } // closeStream is used to close a stream once both sides have -// issued a close. +// issued a close. If there was an in-flight SYN and the stream +// was not yet established, then this will give the credit back. func (s *Session) closeStream(id uint32) { s.streamLock.Lock() + if _, ok := s.inflight[id]; ok { + select { + case <-s.synCh: + default: + s.logger.Printf("[ERR] yamux: SYN tracking out of sync") + } + } delete(s.streams, id) s.streamLock.Unlock() } // establishStream is used to mark a stream that was in the // SYN Sent state as established. -func (s *Session) establishStream() { +func (s *Session) establishStream(id uint32) { + s.streamLock.Lock() + if _, ok := s.inflight[id]; ok { + delete(s.inflight, id) + } else { + s.logger.Printf("[ERR] yamux: established stream without inflight SYN (no tracking entry)") + } select { case <-s.synCh: default: - panic("established stream without inflight syn") + s.logger.Printf("[ERR] yamux: established stream without inflight SYN (didn't have semaphore)") } + s.streamLock.Unlock() } diff --git a/vendor/github.com/hashicorp/yamux/spec.md b/vendor/github.com/hashicorp/yamux/spec.md index 419470b7f..b74dd6451 100644 --- a/vendor/github.com/hashicorp/yamux/spec.md +++ b/vendor/github.com/hashicorp/yamux/spec.md @@ -96,7 +96,7 @@ Because we are relying on the reliable stream underneath, a connection can begin sending data once the SYN flag is sent. The corresponding ACK does not need to be received. This is particularly well suited for an RPC system where a client wants to open a stream and immediately -fire a request without wiating for the RTT of the ACK. +fire a request without waiting for the RTT of the ACK. This does introduce the possibility of a connection being rejected after data has been sent already. This is a slight semantic difference diff --git a/vendor/github.com/hashicorp/yamux/stream.go b/vendor/github.com/hashicorp/yamux/stream.go index 4c3242d3e..ff005ebda 100644 --- a/vendor/github.com/hashicorp/yamux/stream.go +++ b/vendor/github.com/hashicorp/yamux/stream.go @@ -327,7 +327,7 @@ func (s *Stream) processFlags(flags uint16) error { if s.state == streamSYNSent { s.state = streamEstablished } - s.session.establishStream() + s.session.establishStream(s.id) } if flags&flagFIN == flagFIN { switch s.state { @@ -348,9 +348,6 @@ func (s *Stream) processFlags(flags uint16) error { } } if flags&flagRST == flagRST { - if s.state == streamSYNSent { - s.session.establishStream() - } s.state = streamReset closeStream = true s.notifyWaiting() diff --git a/vendor/vendor.json b/vendor/vendor.json index 066964e92..46be5d8a9 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -459,8 +459,10 @@ "revision": "c4c55f16bae1aed9b355ad655d3ebf0215734461" }, { + "checksumSHA1": "xvxetwF2G1XHScrmo8EM3yisjBc=", "path": "github.com/hashicorp/yamux", - "revision": "df949784da9ed028ee76df44652e42d37a09d7e4" + "revision": "172cde3b6ca5c154ff4e6e2ef96b7451332a9946", + "revisionTime": "2016-05-19T16:00:42Z" }, { "comment": "0.2.2-2-gc01cf91",