Refactor to avoid global state

Signed-off-by: Danny Kopping <danny@coder.com>
This commit is contained in:
Danny Kopping 2024-05-02 11:47:13 +02:00
parent 34083d0190
commit 28a96dec4a
No known key found for this signature in database
GPG Key ID: A1B5D94381738C65
3 changed files with 34 additions and 21 deletions

View File

@ -9,24 +9,29 @@ import (
"golang.org/x/xerrors"
)
var channelID uuid.UUID
// Create a new pubsub channel UUID per coderd instance so that multiple replicas do not clash when performing latency
// measurements, and only create one UUID per instance (and not request) to limit the number of notification channels
// that need to be maintained by the Pubsub implementation.
func init() {
channelID = uuid.New()
// LatencyMeasurer is used to measure the send & receive latencies of the underlying Pubsub implementation. We use these
// measurements to export metrics which can indicate when a Pubsub implementation's queue is overloaded and/or full.
type LatencyMeasurer struct {
// Create unique pubsub channel names so that multiple replicas do not clash when performing latency measurements,
// and only create one UUID per Pubsub impl (and not request) to limit the number of notification channels that need
// to be maintained by the Pubsub impl.
channelIDs map[Pubsub]uuid.UUID
}
// MeasureLatency takes a given Pubsub implementation, publishes a message & immediately receives it, and returns the
// observed latency.
func MeasureLatency(ctx context.Context, p Pubsub) (send float64, recv float64, err error) {
func NewLatencyMeasurer() *LatencyMeasurer {
return &LatencyMeasurer{
channelIDs: make(map[Pubsub]uuid.UUID),
}
}
// Measure takes a given Pubsub implementation, publishes a message & immediately receives it, and returns the observed latency.
func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) (send float64, recv float64, err error) {
var (
start time.Time
res = make(chan float64, 1)
)
cancel, err := p.Subscribe(latencyChannelName(), func(ctx context.Context, _ []byte) {
cancel, err := p.Subscribe(lm.latencyChannelName(p), func(ctx context.Context, _ []byte) {
res <- time.Since(start).Seconds()
})
if err != nil {
@ -35,7 +40,7 @@ func MeasureLatency(ctx context.Context, p Pubsub) (send float64, recv float64,
defer cancel()
start = time.Now()
err = p.Publish(latencyChannelName(), []byte{})
err = p.Publish(lm.latencyChannelName(p), []byte{})
if err != nil {
return -1, -1, xerrors.Errorf("failed to publish: %w", err)
}
@ -50,6 +55,12 @@ func MeasureLatency(ctx context.Context, p Pubsub) (send float64, recv float64,
}
}
func latencyChannelName() string {
return fmt.Sprintf("latency-measure:%s", channelID.String())
func (lm *LatencyMeasurer) latencyChannelName(p Pubsub) string {
cid, found := lm.channelIDs[p]
if !found {
cid = uuid.New()
lm.channelIDs[p] = cid
}
return fmt.Sprintf("latency-measure:%s", cid.String())
}

View File

@ -207,6 +207,7 @@ type PGPubsub struct {
disconnectionsTotal prometheus.Counter
connected prometheus.Gauge
latencyMeasurer *LatencyMeasurer
latencyErrCounter atomic.Float64
}
@ -560,7 +561,7 @@ func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
send, recv, err := MeasureLatency(ctx, p)
send, recv, err := p.latencyMeasurer.Measure(ctx, p)
if err != nil {
p.logger.Warn(context.Background(), "failed to measure latency", slog.Error(err))
metrics <- prometheus.MustNewConstMetric(pubsubLatencyMeasureErrDesc, prometheus.CounterValue, p.latencyErrCounter.Add(1))
@ -584,10 +585,11 @@ func New(startCtx context.Context, logger slog.Logger, database *sql.DB, connect
// newWithoutListener creates a new PGPubsub without creating the pqListener.
func newWithoutListener(logger slog.Logger, database *sql.DB) *PGPubsub {
return &PGPubsub{
logger: logger,
listenDone: make(chan struct{}),
db: database,
queues: make(map[string]map[uuid.UUID]*msgQueue),
logger: logger,
listenDone: make(chan struct{}),
db: database,
queues: make(map[string]map[uuid.UUID]*msgQueue),
latencyMeasurer: NewLatencyMeasurer(),
publishesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "coder",

View File

@ -325,7 +325,7 @@ func TestMeasureLatency(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitSuperLong)
defer cancel()
send, recv, err := pubsub.MeasureLatency(ctx, ps)
send, recv, err := pubsub.NewLatencyMeasurer().Measure(ctx, ps)
require.NoError(t, err)
require.Greater(t, send, 0.0)
require.Greater(t, recv, 0.0)
@ -341,7 +341,7 @@ func TestMeasureLatency(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
defer cancel()
send, recv, err := pubsub.MeasureLatency(ctx, ps)
send, recv, err := pubsub.NewLatencyMeasurer().Measure(ctx, ps)
require.ErrorContains(t, err, context.DeadlineExceeded.Error())
require.Greater(t, send, 0.0)
require.EqualValues(t, recv, -1)