Track errors

Signed-off-by: Danny Kopping <danny@coder.com>
This commit is contained in:
Danny Kopping 2024-05-02 09:25:02 +02:00
parent c249b70330
commit f845bda8e5
No known key found for this signature in database
GPG Key ID: A1B5D94381738C65
1 changed files with 19 additions and 8 deletions

View File

@ -12,6 +12,7 @@ import (
"github.com/google/uuid"
"github.com/lib/pq"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"golang.org/x/xerrors"
"cdr.dev/slog"
@ -205,6 +206,8 @@ type PGPubsub struct {
receivedBytesTotal prometheus.Counter
disconnectionsTotal prometheus.Counter
connected prometheus.Gauge
latencyErrCounter atomic.Float64
}
// BufferSize is the maximum number of unhandled messages we will buffer
@ -480,16 +483,21 @@ var (
// additional metrics collected out-of-band
var (
sendLatencyDesc = prometheus.NewDesc(
pubsubSendLatencyDesc = prometheus.NewDesc(
"coder_pubsub_send_latency_seconds",
"The time taken to send a message into a pubsub event channel",
nil, nil,
)
recvLatencyDesc = prometheus.NewDesc(
pubsubRecvLatencyDesc = prometheus.NewDesc(
"coder_pubsub_receive_latency_seconds",
"The time taken to receive a message from a pubsub event channel",
nil, nil,
)
pubsubLatencyMeasureErrDesc = prometheus.NewDesc(
"coder_pubsub_latency_measure_errs_total",
"The number of pubsub latency measurement failures",
nil, nil,
)
)
// We'll track messages as size "normal" and "colossal", where the
@ -520,8 +528,9 @@ func (p *PGPubsub) Describe(descs chan<- *prometheus.Desc) {
descs <- currentEventsDesc
// additional metrics
descs <- sendLatencyDesc
descs <- recvLatencyDesc
descs <- pubsubSendLatencyDesc
descs <- pubsubRecvLatencyDesc
descs <- pubsubLatencyMeasureErrDesc
}
// Collect implements, along with Describe, the prometheus.Collector interface
@ -548,15 +557,17 @@ func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) {
metrics <- prometheus.MustNewConstMetric(currentEventsDesc, prometheus.GaugeValue, float64(events))
// additional metrics
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) // TODO: configurable?
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
send, recv, err := MeasureLatency(ctx, p)
if err != nil {
p.logger.Warn(context.Background(), "failed to measure latency", slog.Error(err))
} else {
metrics <- prometheus.MustNewConstMetric(sendLatencyDesc, prometheus.GaugeValue, send)
metrics <- prometheus.MustNewConstMetric(recvLatencyDesc, prometheus.GaugeValue, recv)
metrics <- prometheus.MustNewConstMetric(pubsubLatencyMeasureErrDesc, prometheus.CounterValue, p.latencyErrCounter.Add(1))
return
}
metrics <- prometheus.MustNewConstMetric(pubsubSendLatencyDesc, prometheus.GaugeValue, send)
metrics <- prometheus.MustNewConstMetric(pubsubRecvLatencyDesc, prometheus.GaugeValue, recv)
}
// New creates a new Pubsub implementation using a PostgreSQL connection.