chore: Buffer remote candidates like local (#77)

* chore: Buffer remote candidates like local

This was added for local candidates, and is required for remote
to prevent a race where they are added before a negotiation is
complete.

I removed the mutex earlier, because it would cause a different race.
I didn't realize the remote candidates wouldn't be buffered,
but with this change they are!

* Use local description instead

* Add logging for candidate flush

* Fix race with atomic bool

* Simplify locks

* Add mutex to flush

* Reset buffer

* Remove leak dependency to limit confusion

* Fix ordering

* Revert channel close

* Flush candidates after remote session description is set

* Bump up count to ensure race is fixed

* Use custom ICE dependency

* Fix data race

* Lower timeout to make for fast CI

* Add back mutex to prevent race

* Improve debug logging

* Lock on local description

* Flush local candidates uniquely

* Fix race

* Move mutex to prevent candidate send race

* Move lock to handshake so no race can occur

* Reduce timeout to improve test times

* Move unlock to defer

* Use flushed bool instead of checking remote
This commit is contained in:
Kyle Carberry 2022-01-27 09:14:52 -06:00 committed by GitHub
parent 9329a50ad6
commit 30dae97c3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 29 additions and 10 deletions

View File

@ -281,6 +281,7 @@ func (c *Channel) closeWithError(err error) error {
c.conn.dcDisconnectListeners.Sub(1)
c.conn.dcFailedListeners.Sub(1)
c.conn.dcClosedWaitGroup.Done()
return err
}

View File

@ -72,8 +72,8 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp
dcDisconnectChannel: make(chan struct{}),
dcFailedChannel: make(chan struct{}),
localCandidateChannel: make(chan webrtc.ICECandidateInit),
localSessionDescriptionChannel: make(chan webrtc.SessionDescription),
pendingCandidates: make([]webrtc.ICECandidateInit, 0),
localSessionDescriptionChannel: make(chan webrtc.SessionDescription),
remoteSessionDescriptionChannel: make(chan webrtc.SessionDescription),
}
if client {
@ -120,8 +120,9 @@ type Conn struct {
localSessionDescriptionChannel chan webrtc.SessionDescription
remoteSessionDescriptionChannel chan webrtc.SessionDescription
pendingCandidates []webrtc.ICECandidateInit
pendingCandidatesMutex sync.Mutex
pendingCandidates []webrtc.ICECandidateInit
pendingCandidatesMutex sync.Mutex
pendingCandidatesFlushed bool
pingChannelID uint16
pingEchoChannelID uint16
@ -141,15 +142,15 @@ func (c *Conn) init() error {
if iceCandidate == nil {
return
}
// ICE Candidates on a remote peer are reset when an offer
// is received. We must wait until the offer<->answer has
// been negotiated to flush candidates.
c.pendingCandidatesMutex.Lock()
defer c.pendingCandidatesMutex.Unlock()
if c.rtc.RemoteDescription() == nil {
if !c.pendingCandidatesFlushed {
c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer")
c.pendingCandidates = append(c.pendingCandidates, iceCandidate.ToJSON())
return
}
c.opts.Logger.Debug(context.Background(), "adding local candidate")
select {
case <-c.closed:
break
@ -282,10 +283,17 @@ func (c *Conn) negotiate() {
err := c.rtc.SetRemoteDescription(remoteDescription)
if err != nil {
c.pendingCandidatesMutex.Unlock()
_ = c.CloseWithError(xerrors.Errorf("set remote description (closed %v): %w", c.isClosed(), err))
return
}
if c.offerrer {
// ICE candidates reset when an offer/answer is set for the first
// time. If candidates flush before this point, a connection could fail.
c.flushPendingCandidates()
}
if !c.offerrer {
answer, err := c.rtc.CreateAnswer(&webrtc.AnswerOptions{})
if err != nil {
@ -305,11 +313,19 @@ func (c *Conn) negotiate() {
return
case c.localSessionDescriptionChannel <- answer:
}
}
// Wait until the local description is set to flush candidates.
c.flushPendingCandidates()
}
}
// flushPendingCandidates writes all local candidates to the candidate send channel.
// The localCandidateChannel is expected to be serviced, otherwise this could block.
func (c *Conn) flushPendingCandidates() {
c.pendingCandidatesMutex.Lock()
defer c.pendingCandidatesMutex.Unlock()
for _, pendingCandidate := range c.pendingCandidates {
c.opts.Logger.Debug(context.Background(), "flushing local candidate")
select {
case <-c.closed:
return
@ -317,6 +333,7 @@ func (c *Conn) negotiate() {
}
}
c.pendingCandidates = make([]webrtc.ICECandidateInit, 0)
c.pendingCandidatesFlushed = true
c.opts.Logger.Debug(context.Background(), "flushed candidates")
}
@ -328,6 +345,7 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit {
// AddRemoteCandidate adds a remote candidate to the RTC connection.
func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error {
c.opts.Logger.Debug(context.Background(), "adding remote candidate")
return c.rtc.AddICECandidate(i)
}

View File

@ -35,11 +35,11 @@ var (
// In CI resources are frequently contended, so increasing this value
// results in less flakes.
if os.Getenv("CI") == "true" {
return 4 * time.Second
return 3 * time.Second
}
return 100 * time.Millisecond
}()
failedTimeout = disconnectedTimeout * 4
failedTimeout = disconnectedTimeout * 3
keepAliveInterval = time.Millisecond * 2
// There's a global race in the vnet library allocation code.