feat: use map instead of slice in metrics aggregator (#11815)

This commit is contained in:
Marcin Tojek 2024-01-29 09:12:41 +01:00 committed by GitHub
parent 37e9479815
commit aacb4a2b4c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 156 additions and 257 deletions

View File

@ -2,6 +2,9 @@ package prometheusmetrics
import (
"context"
"fmt"
"sort"
"strings"
"time"
"github.com/prometheus/client_golang/prometheus"
@ -24,13 +27,15 @@ const (
loggerName = "prometheusmetrics"
sizeCollectCh = 10
sizeUpdateCh = 1024
sizeUpdateCh = 4096
defaultMetricsCleanupInterval = 2 * time.Minute
)
var MetricLabelValueEncoder = strings.NewReplacer("\\", "\\\\", "|", "\\|", ",", "\\,", "=", "\\=")
type MetricsAggregator struct {
queue []annotatedMetric
store map[metricKey]annotatedMetric
log slog.Logger
metricsCleanupInterval time.Duration
@ -38,6 +43,7 @@ type MetricsAggregator struct {
collectCh chan (chan []prometheus.Metric)
updateCh chan updateRequest
storeSizeGauge prometheus.Gauge
updateHistogram prometheus.Histogram
cleanupHistogram prometheus.Histogram
}
@ -64,17 +70,37 @@ type annotatedMetric struct {
expiryDate time.Time
}
var _ prometheus.Collector = new(MetricsAggregator)
type metricKey struct {
username string
workspaceName string
agentName string
templateName string
func (am *annotatedMetric) is(req updateRequest, m *agentproto.Stats_Metric) bool {
return am.username == req.username &&
am.workspaceName == req.workspaceName &&
am.agentName == req.agentName &&
am.templateName == req.templateName &&
am.Name == m.Name &&
agentproto.LabelsEqual(am.Labels, m.Labels)
metricName string
labelsStr string
}
func hashKey(req *updateRequest, m *agentproto.Stats_Metric) metricKey {
labelPairs := make(sort.StringSlice, 0, len(m.GetLabels()))
for _, label := range m.GetLabels() {
if label.Value == "" {
continue
}
labelPairs = append(labelPairs, fmt.Sprintf("%s=%s", label.Name, MetricLabelValueEncoder.Replace(label.Value)))
}
labelPairs.Sort()
return metricKey{
username: req.username,
workspaceName: req.workspaceName,
agentName: req.agentName,
templateName: req.templateName,
metricName: m.Name,
labelsStr: strings.Join(labelPairs, ","),
}
}
var _ prometheus.Collector = new(MetricsAggregator)
func (am *annotatedMetric) asPrometheus() (prometheus.Metric, error) {
labels := make([]string, 0, len(agentMetricsLabels)+len(am.Labels))
labelValues := make([]string, 0, len(agentMetricsLabels)+len(am.Labels))
@ -101,6 +127,17 @@ func NewMetricsAggregator(logger slog.Logger, registerer prometheus.Registerer,
metricsCleanupInterval = duration
}
storeSizeGauge := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: "prometheusmetrics",
Name: "metrics_aggregator_store_size",
Help: "The number of metrics stored in the aggregator",
})
err := registerer.Register(storeSizeGauge)
if err != nil {
return nil, err
}
updateHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "coderd",
Subsystem: "prometheusmetrics",
@ -108,7 +145,7 @@ func NewMetricsAggregator(logger slog.Logger, registerer prometheus.Registerer,
Help: "Histogram for duration of metrics aggregator update in seconds.",
Buckets: []float64{0.001, 0.005, 0.010, 0.025, 0.050, 0.100, 0.500, 1, 5, 10, 30},
})
err := registerer.Register(updateHistogram)
err = registerer.Register(updateHistogram)
if err != nil {
return nil, err
}
@ -129,9 +166,12 @@ func NewMetricsAggregator(logger slog.Logger, registerer prometheus.Registerer,
log: logger.Named(loggerName),
metricsCleanupInterval: metricsCleanupInterval,
store: map[metricKey]annotatedMetric{},
collectCh: make(chan (chan []prometheus.Metric), sizeCollectCh),
updateCh: make(chan updateRequest, sizeUpdateCh),
storeSizeGauge: storeSizeGauge,
updateHistogram: updateHistogram,
cleanupHistogram: cleanupHistogram,
}, nil
@ -152,32 +192,32 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() {
ma.log.Debug(ctx, "update metrics")
timer := prometheus.NewTimer(ma.updateHistogram)
UpdateLoop:
for _, m := range req.metrics {
for i, q := range ma.queue {
if q.is(req, m) {
ma.queue[i].Stats_Metric.Value = m.Value
ma.queue[i].expiryDate = req.timestamp.Add(ma.metricsCleanupInterval)
continue UpdateLoop
key := hashKey(&req, m)
if val, ok := ma.store[key]; ok {
val.Stats_Metric.Value = m.Value
val.expiryDate = req.timestamp.Add(ma.metricsCleanupInterval)
ma.store[key] = val
} else {
ma.store[key] = annotatedMetric{
Stats_Metric: m,
username: req.username,
workspaceName: req.workspaceName,
agentName: req.agentName,
templateName: req.templateName,
expiryDate: req.timestamp.Add(ma.metricsCleanupInterval),
}
}
ma.queue = append(ma.queue, annotatedMetric{
Stats_Metric: m,
username: req.username,
workspaceName: req.workspaceName,
agentName: req.agentName,
templateName: req.templateName,
expiryDate: req.timestamp.Add(ma.metricsCleanupInterval),
})
}
timer.ObserveDuration()
ma.storeSizeGauge.Set(float64(len(ma.store)))
case outputCh := <-ma.collectCh:
ma.log.Debug(ctx, "collect metrics")
output := make([]prometheus.Metric, 0, len(ma.queue))
for _, m := range ma.queue {
output := make([]prometheus.Metric, 0, len(ma.store))
for _, m := range ma.store {
promMetric, err := m.asPrometheus()
if err != nil {
ma.log.Error(ctx, "can't convert Prometheus value type", slog.F("name", m.Name), slog.F("type", m.Type), slog.F("value", m.Value), slog.Error(err))
@ -191,29 +231,17 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() {
ma.log.Debug(ctx, "clean expired metrics")
timer := prometheus.NewTimer(ma.cleanupHistogram)
now := time.Now()
var hasExpiredMetrics bool
for _, m := range ma.queue {
if now.After(m.expiryDate) {
hasExpiredMetrics = true
break
for key, val := range ma.store {
if now.After(val.expiryDate) {
delete(ma.store, key)
}
}
if hasExpiredMetrics {
fresh := make([]annotatedMetric, 0, len(ma.queue))
for _, m := range ma.queue {
if m.expiryDate.After(now) {
fresh = append(fresh, m)
}
}
ma.queue = fresh
}
timer.ObserveDuration()
cleanupTicker.Reset(ma.metricsCleanupInterval)
ma.storeSizeGauge.Set(float64(len(ma.store)))
case <-ctx.Done():
ma.log.Debug(ctx, "metrics aggregator is stopped")

View File

@ -1,210 +0,0 @@
package prometheusmetrics
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/coder/coder/v2/agent/proto"
)
func TestAnnotatedMetric_Is(t *testing.T) {
t.Parallel()
am1 := &annotatedMetric{
Stats_Metric: &proto.Stats_Metric{
Name: "met",
Type: proto.Stats_Metric_COUNTER,
Value: 1,
Labels: []*proto.Stats_Metric_Label{
{Name: "rarity", Value: "blue moon"},
{Name: "certainty", Value: "yes"},
},
},
username: "spike",
workspaceName: "work",
agentName: "janus",
templateName: "tempe",
expiryDate: time.Now(),
}
for _, tc := range []struct {
name string
req updateRequest
m *proto.Stats_Metric
is bool
}{
{
name: "OK",
req: updateRequest{
username: "spike",
workspaceName: "work",
agentName: "janus",
templateName: "tempe",
metrics: nil,
timestamp: time.Now().Add(-5 * time.Second),
},
m: &proto.Stats_Metric{
Name: "met",
Type: proto.Stats_Metric_COUNTER,
Value: 2,
Labels: []*proto.Stats_Metric_Label{
{Name: "rarity", Value: "blue moon"},
{Name: "certainty", Value: "yes"},
},
},
is: true,
},
{
name: "missingLabel",
req: updateRequest{
username: "spike",
workspaceName: "work",
agentName: "janus",
templateName: "tempe",
metrics: nil,
timestamp: time.Now().Add(-5 * time.Second),
},
m: &proto.Stats_Metric{
Name: "met",
Type: proto.Stats_Metric_COUNTER,
Value: 2,
Labels: []*proto.Stats_Metric_Label{
{Name: "certainty", Value: "yes"},
},
},
is: false,
},
{
name: "wrongLabelValue",
req: updateRequest{
username: "spike",
workspaceName: "work",
agentName: "janus",
templateName: "tempe",
metrics: nil,
timestamp: time.Now().Add(-5 * time.Second),
},
m: &proto.Stats_Metric{
Name: "met",
Type: proto.Stats_Metric_COUNTER,
Value: 2,
Labels: []*proto.Stats_Metric_Label{
{Name: "rarity", Value: "blue moon"},
{Name: "certainty", Value: "inshallah"},
},
},
is: false,
},
{
name: "wrongMetricName",
req: updateRequest{
username: "spike",
workspaceName: "work",
agentName: "janus",
templateName: "tempe",
metrics: nil,
timestamp: time.Now().Add(-5 * time.Second),
},
m: &proto.Stats_Metric{
Name: "cub",
Type: proto.Stats_Metric_COUNTER,
Value: 2,
Labels: []*proto.Stats_Metric_Label{
{Name: "rarity", Value: "blue moon"},
{Name: "certainty", Value: "yes"},
},
},
is: false,
},
{
name: "wrongUsername",
req: updateRequest{
username: "steve",
workspaceName: "work",
agentName: "janus",
templateName: "tempe",
metrics: nil,
timestamp: time.Now().Add(-5 * time.Second),
},
m: &proto.Stats_Metric{
Name: "met",
Type: proto.Stats_Metric_COUNTER,
Value: 2,
Labels: []*proto.Stats_Metric_Label{
{Name: "rarity", Value: "blue moon"},
{Name: "certainty", Value: "yes"},
},
},
is: false,
},
{
name: "wrongWorkspaceName",
req: updateRequest{
username: "spike",
workspaceName: "play",
agentName: "janus",
templateName: "tempe",
metrics: nil,
timestamp: time.Now().Add(-5 * time.Second),
},
m: &proto.Stats_Metric{
Name: "met",
Type: proto.Stats_Metric_COUNTER,
Value: 2,
Labels: []*proto.Stats_Metric_Label{
{Name: "rarity", Value: "blue moon"},
{Name: "certainty", Value: "yes"},
},
},
is: false,
},
{
name: "wrongAgentName",
req: updateRequest{
username: "spike",
workspaceName: "work",
agentName: "bond",
templateName: "tempe",
metrics: nil,
timestamp: time.Now().Add(-5 * time.Second),
},
m: &proto.Stats_Metric{
Name: "met",
Type: proto.Stats_Metric_COUNTER,
Value: 2,
Labels: []*proto.Stats_Metric_Label{
{Name: "rarity", Value: "blue moon"},
{Name: "certainty", Value: "yes"},
},
},
is: false,
},
{
name: "wrongTemplateName",
req: updateRequest{
username: "spike",
workspaceName: "work",
agentName: "janus",
templateName: "phoenix",
metrics: nil,
timestamp: time.Now().Add(-5 * time.Second),
},
m: &proto.Stats_Metric{
Name: "met",
Type: proto.Stats_Metric_COUNTER,
Value: 2,
Labels: []*proto.Stats_Metric_Label{
{Name: "rarity", Value: "blue moon"},
{Name: "certainty", Value: "yes"},
},
},
is: false,
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
require.Equal(t, tc.is, am1.is(tc.req, tc.m))
})
}
}

View File

@ -3,6 +3,7 @@ package prometheusmetrics_test
import (
"context"
"sort"
"strings"
"sync/atomic"
"testing"
"time"
@ -70,6 +71,25 @@ func TestUpdateMetrics_MetricsDoNotExpire(t *testing.T) {
{Name: "hello", Value: "world"},
}},
{Name: "d_gauge_four", Type: agentproto.Stats_Metric_GAUGE, Value: 6},
{Name: "e_gauge_four", Type: agentproto.Stats_Metric_GAUGE, Value: 15, Labels: []*agentproto.Stats_Metric_Label{
{Name: "foobar", Value: "Foo,ba=z"},
{Name: "halo", Value: "wor\\,d=1,e=\\,2"},
{Name: "hello", Value: "wo,,r=d"},
}},
{Name: "f_gauge_four", Type: agentproto.Stats_Metric_GAUGE, Value: 6, Labels: []*agentproto.Stats_Metric_Label{
{Name: "empty", Value: ""},
{Name: "foobar", Value: "foobaz"},
}},
}
given3 := []*agentproto.Stats_Metric{
{Name: "e_gauge_four", Type: agentproto.Stats_Metric_GAUGE, Value: 17, Labels: []*agentproto.Stats_Metric_Label{
{Name: "cat", Value: "do,=g"},
{Name: "hello", Value: "wo,,rld"},
}},
{Name: "f_gauge_four", Type: agentproto.Stats_Metric_GAUGE, Value: 8, Labels: []*agentproto.Stats_Metric_Label{
{Name: "foobar", Value: "foobaz"},
}},
}
commonLabels := []*agentproto.Stats_Metric_Label{
@ -80,7 +100,6 @@ func TestUpdateMetrics_MetricsDoNotExpire(t *testing.T) {
}
expected := []*agentproto.Stats_Metric{
{Name: "a_counter_one", Type: agentproto.Stats_Metric_COUNTER, Value: 1, Labels: commonLabels},
{Name: "b_counter_two", Type: agentproto.Stats_Metric_COUNTER, Value: 4, Labels: commonLabels},
{Name: "b_counter_two", Type: agentproto.Stats_Metric_COUNTER, Value: -9, Labels: []*agentproto.Stats_Metric_Label{
{Name: "agent_name", Value: testAgentName},
{Name: "lizz", Value: "rizz"},
@ -88,7 +107,7 @@ func TestUpdateMetrics_MetricsDoNotExpire(t *testing.T) {
{Name: "workspace_name", Value: testWorkspaceName},
{Name: "template_name", Value: testTemplateName},
}},
{Name: "c_gauge_three", Type: agentproto.Stats_Metric_GAUGE, Value: 5, Labels: commonLabels},
{Name: "b_counter_two", Type: agentproto.Stats_Metric_COUNTER, Value: 4, Labels: commonLabels},
{Name: "c_gauge_three", Type: agentproto.Stats_Metric_GAUGE, Value: 2, Labels: []*agentproto.Stats_Metric_Label{
{Name: "agent_name", Value: testAgentName},
{Name: "foobar", Value: "Foobaz"},
@ -97,12 +116,38 @@ func TestUpdateMetrics_MetricsDoNotExpire(t *testing.T) {
{Name: "workspace_name", Value: testWorkspaceName},
{Name: "template_name", Value: testTemplateName},
}},
{Name: "c_gauge_three", Type: agentproto.Stats_Metric_GAUGE, Value: 5, Labels: commonLabels},
{Name: "d_gauge_four", Type: agentproto.Stats_Metric_GAUGE, Value: 6, Labels: commonLabels},
{Name: "e_gauge_four", Type: agentproto.Stats_Metric_GAUGE, Value: 17, Labels: []*agentproto.Stats_Metric_Label{
{Name: "agent_name", Value: testAgentName},
{Name: "cat", Value: "do,=g"},
{Name: "hello", Value: "wo,,rld"},
{Name: "username", Value: testUsername},
{Name: "workspace_name", Value: testWorkspaceName},
{Name: "template_name", Value: testTemplateName},
}},
{Name: "e_gauge_four", Type: agentproto.Stats_Metric_GAUGE, Value: 15, Labels: []*agentproto.Stats_Metric_Label{
{Name: "agent_name", Value: testAgentName},
{Name: "foobar", Value: "Foo,ba=z"},
{Name: "halo", Value: "wor\\,d=1,e=\\,2"},
{Name: "hello", Value: "wo,,r=d"},
{Name: "username", Value: testUsername},
{Name: "workspace_name", Value: testWorkspaceName},
{Name: "template_name", Value: testTemplateName},
}},
{Name: "f_gauge_four", Type: agentproto.Stats_Metric_GAUGE, Value: 8, Labels: []*agentproto.Stats_Metric_Label{
{Name: "agent_name", Value: testAgentName},
{Name: "foobar", Value: "foobaz"},
{Name: "username", Value: testUsername},
{Name: "workspace_name", Value: testWorkspaceName},
{Name: "template_name", Value: testTemplateName},
}},
}
// when
metricsAggregator.Update(ctx, testLabels, given1)
metricsAggregator.Update(ctx, testLabels, given2)
metricsAggregator.Update(ctx, testLabels, given3)
// then
require.Eventually(t, func() bool {
@ -130,6 +175,12 @@ func verifyCollectedMetrics(t *testing.T, expected []*agentproto.Stats_Metric, a
return false
}
sort.Slice(actual, func(i, j int) bool {
m1 := prometheusMetricToString(t, actual[i])
m2 := prometheusMetricToString(t, actual[j])
return m1 < m2
})
for i, e := range expected {
desc := actual[i].Desc()
assert.Contains(t, desc.String(), e.Name)
@ -156,9 +207,39 @@ func verifyCollectedMetrics(t *testing.T, expected []*agentproto.Stats_Metric, a
return true
}
func prometheusMetricToString(t *testing.T, m prometheus.Metric) string {
var sb strings.Builder
desc := m.Desc()
_, _ = sb.WriteString(desc.String())
_ = sb.WriteByte('|')
var d dto.Metric
err := m.Write(&d)
require.NoError(t, err)
dtoLabels := asMetricAgentLabels(d.GetLabel())
sort.Slice(dtoLabels, func(i, j int) bool {
return dtoLabels[i].Name < dtoLabels[j].Name
})
for _, dtoLabel := range dtoLabels {
if dtoLabel.Value == "" {
continue
}
_, _ = sb.WriteString(dtoLabel.Name)
_ = sb.WriteByte('=')
_, _ = sb.WriteString(prometheusmetrics.MetricLabelValueEncoder.Replace(dtoLabel.Value))
}
return strings.TrimRight(sb.String(), ",")
}
func asMetricAgentLabels(dtoLabels []*dto.LabelPair) []*agentproto.Stats_Metric_Label {
metricLabels := make([]*agentproto.Stats_Metric_Label, 0, len(dtoLabels))
for _, dtoLabel := range dtoLabels {
if dtoLabel.GetValue() == "" {
continue
}
metricLabels = append(metricLabels, &agentproto.Stats_Metric_Label{
Name: dtoLabel.GetName(),
Value: dtoLabel.GetValue(),