chore: Update pion/ice fork to resolve goroutine leak (#78)

* chore: Update pion/ice fork to resolve goroutine leak

* Flush remote too

* Add logs for setting the description

* Try locking only on remote

* Remove local bufferring in favor of remote

* Remove unused flush func

* Set candidates flushed to true

* Defer flush until the end of negotiation

* Buffer ICE candidates

* Add comment clarifying channel buffer

* Flush after handshake

* Move away from fork

* Ignore pion/ice leaks
This commit is contained in:
Kyle Carberry 2022-01-27 20:43:55 -06:00 committed by GitHub
parent 30dae97c3e
commit 27f7299383
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 46 additions and 47 deletions

3
go.mod
View File

@ -2,9 +2,6 @@ module github.com/coder/coder
go 1.17
// Required until https://github.com/pion/ice/pull/413 is merged.
replace github.com/pion/ice/v2 => github.com/kylecarbs/ice/v2 v2.1.8-0.20220127013758-526c25708344
// Required until https://github.com/hashicorp/terraform-config-inspect/pull/74 is merged.
replace github.com/hashicorp/terraform-config-inspect => github.com/kylecarbs/terraform-config-inspect v0.0.0-20211215004401-bbc517866b88

7
go.sum
View File

@ -835,8 +835,6 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/ktrysmt/go-bitbucket v0.6.4/go.mod h1:9u0v3hsd2rqCHRIpbir1oP7F58uo5dq19sBYvuMoyQ4=
github.com/kylecarbs/ice/v2 v2.1.8-0.20220127013758-526c25708344 h1:rXpDqMPlbnKASSBFwPrJbT2wEL5jZzIX/i0cvwISxlM=
github.com/kylecarbs/ice/v2 v2.1.8-0.20220127013758-526c25708344/go.mod h1:E5frMpIJ3zzcQiRo+XyT7z1IiAsGc1hDURcVJQUzGWA=
github.com/kylecarbs/terraform-config-inspect v0.0.0-20211215004401-bbc517866b88 h1:tvG/qs5c4worwGyGnbbb4i/dYYLjpFwDMqcIT3awAf8=
github.com/kylecarbs/terraform-config-inspect v0.0.0-20211215004401-bbc517866b88/go.mod h1:Z0Nnk4+3Cy89smEbrq+sl1bxc9198gIP4I7wcQF6Kqs=
github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
@ -1014,8 +1012,12 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi
github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pion/datachannel v1.5.2 h1:piB93s8LGmbECrpO84DnkIVWasRMk3IimbcXkTQLE6E=
github.com/pion/datachannel v1.5.2/go.mod h1:FTGQWaHrdCwIJ1rw6xBIfZVkslikjShim5yr05XFuCQ=
github.com/pion/dtls/v2 v2.0.13/go.mod h1:OaE7eTM+ppaUhJ99OTO4aHl9uY6vPrT1gPY27uNTxRY=
github.com/pion/dtls/v2 v2.1.0 h1:g6gtKVNLp6URDkv9OijFJl16kqGHzVzZG+Fa4A38GTY=
github.com/pion/dtls/v2 v2.1.0/go.mod h1:qG3gA7ZPZemBqpEFqRKyURYdKEwFZQCGb7gv9T3ON3Y=
github.com/pion/ice/v2 v2.1.18/go.mod h1:9jDr0iIUg8P6+0Jq8QJ/eFSkX3JnsPd293TjCdkfpTs=
github.com/pion/ice/v2 v2.1.19 h1:z7iVx/fHlqvPILUbvcj1xjuz/6eVKgEFOM8h1AuLbF8=
github.com/pion/ice/v2 v2.1.19/go.mod h1:E5frMpIJ3zzcQiRo+XyT7z1IiAsGc1hDURcVJQUzGWA=
github.com/pion/interceptor v0.1.6/go.mod h1:Lh3JSl/cbJ2wP8I3ccrjh1K/deRGRn3UlSPuOTiHb6U=
github.com/pion/interceptor v0.1.7 h1:HThW0tIIKT9RRoDWGURe8rlZVOx0fJHxBHpA0ej0+bo=
github.com/pion/interceptor v0.1.7/go.mod h1:Lh3JSl/cbJ2wP8I3ccrjh1K/deRGRn3UlSPuOTiHb6U=
@ -1311,6 +1313,7 @@ golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220126234351-aa10faf2a1f8 h1:kACShD3qhmr/3rLmg1yXyt+N4HcwutKyPRB93s54TIU=
golang.org/x/crypto v0.0.0-20220126234351-aa10faf2a1f8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=

View File

@ -62,17 +62,19 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp
return nil, xerrors.Errorf("create peer connection: %w", err)
}
conn := &Conn{
pingChannelID: 1,
pingEchoChannelID: 2,
opts: opts,
rtc: rtc,
offerrer: client,
closed: make(chan struct{}),
dcOpenChannel: make(chan *webrtc.DataChannel),
dcDisconnectChannel: make(chan struct{}),
dcFailedChannel: make(chan struct{}),
localCandidateChannel: make(chan webrtc.ICECandidateInit),
pendingCandidates: make([]webrtc.ICECandidateInit, 0),
pingChannelID: 1,
pingEchoChannelID: 2,
opts: opts,
rtc: rtc,
offerrer: client,
closed: make(chan struct{}),
dcOpenChannel: make(chan *webrtc.DataChannel),
dcDisconnectChannel: make(chan struct{}),
dcFailedChannel: make(chan struct{}),
// This channel needs to be bufferred otherwise slow consumers
// of this will cause a connection failure.
localCandidateChannel: make(chan webrtc.ICECandidateInit, 16),
pendingRemoteCandidates: make([]webrtc.ICECandidateInit, 0),
localSessionDescriptionChannel: make(chan webrtc.SessionDescription),
remoteSessionDescriptionChannel: make(chan webrtc.SessionDescription),
}
@ -120,7 +122,7 @@ type Conn struct {
localSessionDescriptionChannel chan webrtc.SessionDescription
remoteSessionDescriptionChannel chan webrtc.SessionDescription
pendingCandidates []webrtc.ICECandidateInit
pendingRemoteCandidates []webrtc.ICECandidateInit
pendingCandidatesMutex sync.Mutex
pendingCandidatesFlushed bool
@ -142,14 +144,6 @@ func (c *Conn) init() error {
if iceCandidate == nil {
return
}
c.pendingCandidatesMutex.Lock()
defer c.pendingCandidatesMutex.Unlock()
if !c.pendingCandidatesFlushed {
c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer")
c.pendingCandidates = append(c.pendingCandidates, iceCandidate.ToJSON())
return
}
c.opts.Logger.Debug(context.Background(), "adding local candidate")
select {
case <-c.closed:
@ -262,6 +256,7 @@ func (c *Conn) negotiate() {
_ = c.CloseWithError(xerrors.Errorf("create offer: %w", err))
return
}
c.opts.Logger.Debug(context.Background(), "setting local description")
err = c.rtc.SetLocalDescription(offer)
if err != nil {
_ = c.CloseWithError(xerrors.Errorf("set local description: %w", err))
@ -281,25 +276,20 @@ func (c *Conn) negotiate() {
case remoteDescription = <-c.remoteSessionDescriptionChannel:
}
c.opts.Logger.Debug(context.Background(), "setting remote description")
err := c.rtc.SetRemoteDescription(remoteDescription)
if err != nil {
c.pendingCandidatesMutex.Unlock()
_ = c.CloseWithError(xerrors.Errorf("set remote description (closed %v): %w", c.isClosed(), err))
return
}
if c.offerrer {
// ICE candidates reset when an offer/answer is set for the first
// time. If candidates flush before this point, a connection could fail.
c.flushPendingCandidates()
}
if !c.offerrer {
answer, err := c.rtc.CreateAnswer(&webrtc.AnswerOptions{})
if err != nil {
_ = c.CloseWithError(xerrors.Errorf("create answer: %w", err))
return
}
c.opts.Logger.Debug(context.Background(), "setting local description")
err = c.rtc.SetLocalDescription(answer)
if err != nil {
_ = c.CloseWithError(xerrors.Errorf("set local description: %w", err))
@ -313,28 +303,23 @@ func (c *Conn) negotiate() {
return
case c.localSessionDescriptionChannel <- answer:
}
// Wait until the local description is set to flush candidates.
c.flushPendingCandidates()
}
}
// flushPendingCandidates writes all local candidates to the candidate send channel.
// The localCandidateChannel is expected to be serviced, otherwise this could block.
func (c *Conn) flushPendingCandidates() {
// The ICE transport resets when the remote description is updated.
// Adding ICE candidates before this point causes a failed connection,
// because the candidate would be lost.
c.pendingCandidatesMutex.Lock()
defer c.pendingCandidatesMutex.Unlock()
for _, pendingCandidate := range c.pendingCandidates {
c.opts.Logger.Debug(context.Background(), "flushing local candidate")
select {
case <-c.closed:
for _, pendingCandidate := range c.pendingRemoteCandidates {
c.opts.Logger.Debug(context.Background(), "flushing remote candidate")
err := c.rtc.AddICECandidate(pendingCandidate)
if err != nil {
_ = c.CloseWithError(xerrors.Errorf("flush pending candidates: %w", err))
return
case c.localCandidateChannel <- pendingCandidate:
}
}
c.pendingCandidates = make([]webrtc.ICECandidateInit, 0)
c.pendingCandidatesFlushed = true
c.opts.Logger.Debug(context.Background(), "flushed candidates")
c.opts.Logger.Debug(context.Background(), "flushed remote candidates")
}
// LocalCandidate returns a channel that emits when a local candidate
@ -345,6 +330,13 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit {
// AddRemoteCandidate adds a remote candidate to the RTC connection.
func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error {
c.pendingCandidatesMutex.Lock()
defer c.pendingCandidatesMutex.Unlock()
if !c.pendingCandidatesFlushed {
c.opts.Logger.Debug(context.Background(), "adding remote candidate to buffer")
c.pendingRemoteCandidates = append(c.pendingRemoteCandidates, i)
return nil
}
c.opts.Logger.Debug(context.Background(), "adding remote candidate")
return c.rtc.AddICECandidate(i)
}

View File

@ -48,7 +48,14 @@ var (
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
// pion/ice doesn't properly close immediately. The solution for this isn't yet known. See:
// https://github.com/pion/ice/pull/413
goleak.VerifyTestMain(m,
goleak.IgnoreTopFunction("github.com/pion/ice/v2.(*Agent).startOnConnectionStateChangeRoutine.func1"),
goleak.IgnoreTopFunction("github.com/pion/ice/v2.(*Agent).startOnConnectionStateChangeRoutine.func2"),
goleak.IgnoreTopFunction("github.com/pion/ice/v2.(*Agent).taskLoop"),
goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"),
)
}
func TestConn(t *testing.T) {