ci: Improve peer logging to help identify race (#93)

* ci: Improve peer logging to help identify race

* Remove mutex locks

* Add hash to write
This commit is contained in:
Kyle Carberry 2022-01-29 19:33:19 -06:00 committed by GitHub
parent 3e88f1502a
commit 5367d93b87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 48 additions and 9 deletions

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/rand"
"crypto/sha256"
"io"
"sync"
"time"
@ -140,15 +141,43 @@ type Conn struct {
func (c *Conn) init() error {
c.rtc.OnNegotiationNeeded(c.negotiate)
c.rtc.OnICEConnectionStateChange(func(iceConnectionState webrtc.ICEConnectionState) {
// Close must be locked here otherwise log output can appear
// after the connection has been closed.
c.closeMutex.Lock()
defer c.closeMutex.Unlock()
if c.isClosed() {
return
}
c.opts.Logger.Debug(context.Background(), "ice connection state updated",
slog.F("state", iceConnectionState))
})
c.rtc.OnICEGatheringStateChange(func(iceGatherState webrtc.ICEGathererState) {
// Close can't be locked here, because this is triggered
// when close is called. It doesn't appear this get's
// executed after close though, so it shouldn't cause
// problems.
if c.isClosed() {
return
}
c.opts.Logger.Debug(context.Background(), "ice gathering state updated",
slog.F("state", iceGatherState))
})
c.rtc.OnICECandidate(func(iceCandidate *webrtc.ICECandidate) {
if iceCandidate == nil {
return
}
c.opts.Logger.Debug(context.Background(), "adding local candidate")
json := iceCandidate.ToJSON()
c.opts.Logger.Debug(context.Background(), "writing candidate to channel",
slog.F("hash", sha256.Sum224([]byte(json.Candidate))),
slog.F("length", len(json.Candidate)),
)
select {
case <-c.closed:
break
case c.localCandidateChannel <- iceCandidate.ToJSON():
case c.localCandidateChannel <- json:
}
})
c.rtc.OnDataChannel(func(dc *webrtc.DataChannel) {
@ -169,8 +198,7 @@ func (c *Conn) init() error {
}
c.opts.Logger.Debug(context.Background(), "rtc connection updated",
slog.F("state", pcs),
slog.F("ice", c.rtc.ICEConnectionState()))
slog.F("state", pcs))
switch pcs {
case webrtc.PeerConnectionStateDisconnected:
@ -311,15 +339,22 @@ func (c *Conn) negotiate() {
c.pendingCandidatesMutex.Lock()
defer c.pendingCandidatesMutex.Unlock()
for _, pendingCandidate := range c.pendingRemoteCandidates {
c.opts.Logger.Debug(context.Background(), "flushing remote candidate")
hash := sha256.Sum224([]byte(pendingCandidate.Candidate))
c.opts.Logger.Debug(context.Background(), "flushing buffered remote candidate",
slog.F("hash", hash),
slog.F("length", len(pendingCandidate.Candidate)),
)
err := c.rtc.AddICECandidate(pendingCandidate)
if err != nil {
_ = c.CloseWithError(xerrors.Errorf("flush pending candidates: %w", err))
_ = c.CloseWithError(xerrors.Errorf("flush pending remote candidate: %w", err))
return
}
}
c.opts.Logger.Debug(context.Background(), "flushed buffered remote candidates",
slog.F("count", len(c.pendingRemoteCandidates)),
)
c.pendingCandidatesFlushed = true
c.opts.Logger.Debug(context.Background(), "flushed remote candidates")
c.pendingRemoteCandidates = make([]webrtc.ICECandidateInit, 0)
}
// LocalCandidate returns a channel that emits when a local candidate
@ -332,12 +367,16 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit {
func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error {
c.pendingCandidatesMutex.Lock()
defer c.pendingCandidatesMutex.Unlock()
fields := []slog.Field{
slog.F("hash", sha256.Sum224([]byte(i.Candidate))),
slog.F("length", len(i.Candidate)),
}
if !c.pendingCandidatesFlushed {
c.opts.Logger.Debug(context.Background(), "adding remote candidate to buffer")
c.opts.Logger.Debug(context.Background(), "bufferring remote candidate", fields...)
c.pendingRemoteCandidates = append(c.pendingRemoteCandidates, i)
return nil
}
c.opts.Logger.Debug(context.Background(), "adding remote candidate")
c.opts.Logger.Debug(context.Background(), "adding remote candidate", fields...)
return c.rtc.AddICECandidate(i)
}