Fix pubsub goroutines in tests (#7677)

Signed-off-by: Spike Curtis <spike@coder.com>
This commit is contained in:
Spike Curtis 2023-05-25 14:46:32 +04:00 committed by GitHub
parent 67cc196c92
commit 05da1e94a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 8 deletions

View File

@ -185,6 +185,13 @@ func (p *pgPubsub) SubscribeWithErr(event string, listener ListenerWithErr) (can
func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(), err error) {
p.mut.Lock()
defer p.mut.Unlock()
defer func() {
if err != nil {
// if we hit an error, we need to close the queue so we don't
// leak its goroutine.
newQ.close()
}
}()
err = p.pgListener.Listen(event)
if errors.Is(err, pq.ErrChannelAlreadyOpen) {

View File

@ -38,7 +38,9 @@ func TestReplica(t *testing.T) {
})
require.NoError(t, err)
defer cancel()
server, err := replicasync.New(context.Background(), slogtest.Make(t, nil), db, pubsub, nil)
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
server, err := replicasync.New(ctx, slogtest.Make(t, nil), db, pubsub, nil)
require.NoError(t, err)
<-closeChan
_ = server.Close()
@ -62,7 +64,9 @@ func TestReplica(t *testing.T) {
RelayAddress: srv.URL,
})
require.NoError(t, err)
server, err := replicasync.New(context.Background(), slogtest.Make(t, nil), db, pubsub, &replicasync.Options{
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
server, err := replicasync.New(ctx, slogtest.Make(t, nil), db, pubsub, &replicasync.Options{
RelayAddress: "http://169.254.169.254",
})
require.NoError(t, err)
@ -102,7 +106,9 @@ func TestReplica(t *testing.T) {
RelayAddress: srv.URL,
})
require.NoError(t, err)
server, err := replicasync.New(context.Background(), slogtest.Make(t, nil), db, pubsub, &replicasync.Options{
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
server, err := replicasync.New(ctx, slogtest.Make(t, nil), db, pubsub, &replicasync.Options{
RelayAddress: "http://169.254.169.254",
TLSConfig: tlsConfig,
})
@ -125,7 +131,9 @@ func TestReplica(t *testing.T) {
RelayAddress: "http://127.0.0.1:1",
})
require.NoError(t, err)
server, err := replicasync.New(context.Background(), slogtest.Make(t, nil), db, pubsub, &replicasync.Options{
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
server, err := replicasync.New(ctx, slogtest.Make(t, nil), db, pubsub, &replicasync.Options{
PeerTimeout: 1 * time.Millisecond,
RelayAddress: "http://127.0.0.1:1",
})
@ -140,13 +148,15 @@ func TestReplica(t *testing.T) {
// Refresh when a new replica appears!
t.Parallel()
db, pubsub := dbtestutil.NewDB(t)
server, err := replicasync.New(context.Background(), slogtest.Make(t, nil), db, pubsub, nil)
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
server, err := replicasync.New(ctx, slogtest.Make(t, nil), db, pubsub, nil)
require.NoError(t, err)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()
peer, err := db.InsertReplica(context.Background(), database.InsertReplicaParams{
peer, err := db.InsertReplica(ctx, database.InsertReplicaParams{
ID: uuid.New(),
RelayAddress: srv.URL,
UpdatedAt: database.Now(),
@ -170,7 +180,9 @@ func TestReplica(t *testing.T) {
UpdatedAt: database.Now().Add(-time.Hour),
})
require.NoError(t, err)
server, err := replicasync.New(context.Background(), slogtest.Make(t, nil), db, pubsub, &replicasync.Options{
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
server, err := replicasync.New(ctx, slogtest.Make(t, nil), db, pubsub, &replicasync.Options{
RelayAddress: "google.com",
CleanupInterval: time.Millisecond,
})
@ -184,6 +196,8 @@ func TestReplica(t *testing.T) {
// Ensures that twenty concurrent replicas can spawn and all
// discover each other in parallel!
t.Parallel()
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
// This doesn't use the database fake because creating
// this many PostgreSQL connections takes some
// configuration tweaking.
@ -198,7 +212,7 @@ func TestReplica(t *testing.T) {
count := 20
wg.Add(count)
for i := 0; i < count; i++ {
server, err := replicasync.New(context.Background(), logger, db, pubsub, &replicasync.Options{
server, err := replicasync.New(ctx, logger, db, pubsub, &replicasync.Options{
RelayAddress: srv.URL,
})
require.NoError(t, err)