Measuring pubsub latency and exposing as Prometheus metric

Signed-off-by: Danny Kopping <danny@coder.com>
This commit is contained in:
Danny Kopping 2024-05-01 17:46:00 +02:00
parent 3ff9cef498
commit c249b70330
No known key found for this signature in database
GPG Key ID: A1B5D94381738C65
3 changed files with 122 additions and 0 deletions

View File

@ -0,0 +1,56 @@
package pubsub
import (
"context"
"fmt"
"time"
"github.com/google/uuid"
"golang.org/x/xerrors"
)
var channelID uuid.UUID
// Create a new pubsub channel UUID per coderd instance so that multiple replicas do not clash when performing latency
// measurements, and only create one UUID per instance (and not request) to limit the number of notification channels
// that need to be maintained by the Pubsub implementation.
func init() {
channelID = uuid.New()
}
// MeasureLatency takes a given Pubsub implementation, publishes a message & immediately receives it, and returns the
// observed latency.
func MeasureLatency(ctx context.Context, p Pubsub) (send float64, recv float64, err error) {
var (
start time.Time
res = make(chan float64, 1)
)
cancel, err := p.Subscribe(latencyChannelName(), func(ctx context.Context, _ []byte) {
res <- time.Since(start).Seconds()
})
if err != nil {
return -1, -1, xerrors.Errorf("failed to subscribe: %w", err)
}
defer cancel()
start = time.Now()
err = p.Publish(latencyChannelName(), []byte("ping"))
if err != nil {
return -1, -1, xerrors.Errorf("failed to publish: %w", err)
}
send = time.Since(start).Seconds()
select {
case <-ctx.Done():
cancel()
return send, -1, context.DeadlineExceeded
case val := <-res:
return send, val, nil
}
}
func latencyChannelName() string {
return fmt.Sprintf("latency-measure:%s", channelID.String())
}

View File

@ -478,6 +478,20 @@ var (
)
)
// additional metrics collected out-of-band
var (
sendLatencyDesc = prometheus.NewDesc(
"coder_pubsub_send_latency_seconds",
"The time taken to send a message into a pubsub event channel",
nil, nil,
)
recvLatencyDesc = prometheus.NewDesc(
"coder_pubsub_receive_latency_seconds",
"The time taken to receive a message from a pubsub event channel",
nil, nil,
)
)
// We'll track messages as size "normal" and "colossal", where the
// latter are messages larger than 7600 bytes, or 95% of the postgres
// notify limit. If we see a lot of colossal packets that's an indication that
@ -504,6 +518,10 @@ func (p *PGPubsub) Describe(descs chan<- *prometheus.Desc) {
// implicit metrics
descs <- currentSubscribersDesc
descs <- currentEventsDesc
// additional metrics
descs <- sendLatencyDesc
descs <- recvLatencyDesc
}
// Collect implements, along with Describe, the prometheus.Collector interface
@ -528,6 +546,17 @@ func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) {
p.qMu.Unlock()
metrics <- prometheus.MustNewConstMetric(currentSubscribersDesc, prometheus.GaugeValue, float64(subs))
metrics <- prometheus.MustNewConstMetric(currentEventsDesc, prometheus.GaugeValue, float64(events))
// additional metrics
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) // TODO: configurable?
defer cancel()
send, recv, err := MeasureLatency(ctx, p)
if err != nil {
p.logger.Warn(context.Background(), "failed to measure latency", slog.Error(err))
} else {
metrics <- prometheus.MustNewConstMetric(sendLatencyDesc, prometheus.GaugeValue, send)
metrics <- prometheus.MustNewConstMetric(recvLatencyDesc, prometheus.GaugeValue, recv)
}
}
// New creates a new Pubsub implementation using a PostgreSQL connection.

View File

@ -294,3 +294,40 @@ func TestPubsub_Disconnect(t *testing.T) {
}
require.True(t, gotDroppedErr)
}
func TestMeasureLatency(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
connectionURL, closePg, err := dbtestutil.Open()
require.NoError(t, err)
defer closePg()
db, err := sql.Open("postgres", connectionURL)
require.NoError(t, err)
defer db.Close()
ps, err := pubsub.New(ctx, logger, db, connectionURL)
require.NoError(t, err)
defer ps.Close()
t.Run("MeasureLatency", func(t *testing.T) {
tCtx, tCancel := context.WithTimeout(ctx, testutil.WaitSuperLong)
defer tCancel()
send, recv, err := pubsub.MeasureLatency(tCtx, ps)
require.NoError(t, err)
require.NotZero(t, send)
require.NotZero(t, recv)
})
t.Run("MeasureLatencyRecvTimeout", func(t *testing.T) {
tCtx, tCancel := context.WithTimeout(ctx, time.Nanosecond)
defer tCancel()
send, recv, err := pubsub.MeasureLatency(tCtx, ps)
require.ErrorContains(t, err, context.DeadlineExceeded.Error())
require.NotZero(t, send)
require.EqualValues(t, recv, -1)
})
}