fix: close pg PubSub listener to avoid race (#11640)

Fixes flake as seen here: https://github.com/coder/coder/runs/20528529187
This commit is contained in:
Spike Curtis 2024-01-18 09:18:59 +04:00 committed by GitHub
parent 72d9ec07aa
commit 387723a596
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 26 additions and 10 deletions

View File

@ -162,13 +162,15 @@ func (q *msgQueue) dropped() {
// Pubsub implementation using PostgreSQL.
type pgPubsub struct {
ctx context.Context
cancel context.CancelFunc
listenDone chan struct{}
pgListener *pq.Listener
db *sql.DB
mut sync.Mutex
queues map[string]map[uuid.UUID]*msgQueue
ctx context.Context
cancel context.CancelFunc
listenDone chan struct{}
pgListener *pq.Listener
db *sql.DB
mut sync.Mutex
queues map[string]map[uuid.UUID]*msgQueue
closedListener bool
closeListenerErr error
}
// BufferSize is the maximum number of unhandled messages we will buffer
@ -240,15 +242,29 @@ func (p *pgPubsub) Publish(event string, message []byte) error {
// Close closes the pubsub instance.
func (p *pgPubsub) Close() error {
p.cancel()
err := p.pgListener.Close()
err := p.closeListener()
<-p.listenDone
return err
}
// closeListener closes the pgListener, unless it has already been closed.
func (p *pgPubsub) closeListener() error {
p.mut.Lock()
defer p.mut.Unlock()
if p.closedListener {
return p.closeListenerErr
}
p.closeListenerErr = p.pgListener.Close()
p.closedListener = true
return p.closeListenerErr
}
// listen begins receiving messages on the pq listener.
func (p *pgPubsub) listen() {
defer close(p.listenDone)
defer p.pgListener.Close()
defer func() {
_ = p.closeListener()
close(p.listenDone)
}()
var (
notif *pq.Notification