From aacb4a2b4c63e70e5fec54117609389fff5c4fa0 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Mon, 29 Jan 2024 09:12:41 +0100 Subject: [PATCH] feat: use map instead of slice in metrics aggregator (#11815) --- coderd/prometheusmetrics/aggregator.go | 118 ++++++---- .../aggregator_internal_test.go | 210 ------------------ coderd/prometheusmetrics/aggregator_test.go | 85 ++++++- 3 files changed, 156 insertions(+), 257 deletions(-) delete mode 100644 coderd/prometheusmetrics/aggregator_internal_test.go diff --git a/coderd/prometheusmetrics/aggregator.go b/coderd/prometheusmetrics/aggregator.go index aac06d63ef..40ad6c7b2f 100644 --- a/coderd/prometheusmetrics/aggregator.go +++ b/coderd/prometheusmetrics/aggregator.go @@ -2,6 +2,9 @@ package prometheusmetrics import ( "context" + "fmt" + "sort" + "strings" "time" "github.com/prometheus/client_golang/prometheus" @@ -24,13 +27,15 @@ const ( loggerName = "prometheusmetrics" sizeCollectCh = 10 - sizeUpdateCh = 1024 + sizeUpdateCh = 4096 defaultMetricsCleanupInterval = 2 * time.Minute ) +var MetricLabelValueEncoder = strings.NewReplacer("\\", "\\\\", "|", "\\|", ",", "\\,", "=", "\\=") + type MetricsAggregator struct { - queue []annotatedMetric + store map[metricKey]annotatedMetric log slog.Logger metricsCleanupInterval time.Duration @@ -38,6 +43,7 @@ type MetricsAggregator struct { collectCh chan (chan []prometheus.Metric) updateCh chan updateRequest + storeSizeGauge prometheus.Gauge updateHistogram prometheus.Histogram cleanupHistogram prometheus.Histogram } @@ -64,17 +70,37 @@ type annotatedMetric struct { expiryDate time.Time } -var _ prometheus.Collector = new(MetricsAggregator) +type metricKey struct { + username string + workspaceName string + agentName string + templateName string -func (am *annotatedMetric) is(req updateRequest, m *agentproto.Stats_Metric) bool { - return am.username == req.username && - am.workspaceName == req.workspaceName && - am.agentName == req.agentName && - am.templateName == req.templateName && - am.Name == m.Name && - agentproto.LabelsEqual(am.Labels, m.Labels) + metricName string + labelsStr string } +func hashKey(req *updateRequest, m *agentproto.Stats_Metric) metricKey { + labelPairs := make(sort.StringSlice, 0, len(m.GetLabels())) + for _, label := range m.GetLabels() { + if label.Value == "" { + continue + } + labelPairs = append(labelPairs, fmt.Sprintf("%s=%s", label.Name, MetricLabelValueEncoder.Replace(label.Value))) + } + labelPairs.Sort() + return metricKey{ + username: req.username, + workspaceName: req.workspaceName, + agentName: req.agentName, + templateName: req.templateName, + metricName: m.Name, + labelsStr: strings.Join(labelPairs, ","), + } +} + +var _ prometheus.Collector = new(MetricsAggregator) + func (am *annotatedMetric) asPrometheus() (prometheus.Metric, error) { labels := make([]string, 0, len(agentMetricsLabels)+len(am.Labels)) labelValues := make([]string, 0, len(agentMetricsLabels)+len(am.Labels)) @@ -101,6 +127,17 @@ func NewMetricsAggregator(logger slog.Logger, registerer prometheus.Registerer, metricsCleanupInterval = duration } + storeSizeGauge := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "coderd", + Subsystem: "prometheusmetrics", + Name: "metrics_aggregator_store_size", + Help: "The number of metrics stored in the aggregator", + }) + err := registerer.Register(storeSizeGauge) + if err != nil { + return nil, err + } + updateHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: "coderd", Subsystem: "prometheusmetrics", @@ -108,7 +145,7 @@ func NewMetricsAggregator(logger slog.Logger, registerer prometheus.Registerer, Help: "Histogram for duration of metrics aggregator update in seconds.", Buckets: []float64{0.001, 0.005, 0.010, 0.025, 0.050, 0.100, 0.500, 1, 5, 10, 30}, }) - err := registerer.Register(updateHistogram) + err = registerer.Register(updateHistogram) if err != nil { return nil, err } @@ -129,9 +166,12 @@ func NewMetricsAggregator(logger slog.Logger, registerer prometheus.Registerer, log: logger.Named(loggerName), metricsCleanupInterval: metricsCleanupInterval, + store: map[metricKey]annotatedMetric{}, + collectCh: make(chan (chan []prometheus.Metric), sizeCollectCh), updateCh: make(chan updateRequest, sizeUpdateCh), + storeSizeGauge: storeSizeGauge, updateHistogram: updateHistogram, cleanupHistogram: cleanupHistogram, }, nil @@ -152,32 +192,32 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() { ma.log.Debug(ctx, "update metrics") timer := prometheus.NewTimer(ma.updateHistogram) - UpdateLoop: for _, m := range req.metrics { - for i, q := range ma.queue { - if q.is(req, m) { - ma.queue[i].Stats_Metric.Value = m.Value - ma.queue[i].expiryDate = req.timestamp.Add(ma.metricsCleanupInterval) - continue UpdateLoop + key := hashKey(&req, m) + + if val, ok := ma.store[key]; ok { + val.Stats_Metric.Value = m.Value + val.expiryDate = req.timestamp.Add(ma.metricsCleanupInterval) + ma.store[key] = val + } else { + ma.store[key] = annotatedMetric{ + Stats_Metric: m, + username: req.username, + workspaceName: req.workspaceName, + agentName: req.agentName, + templateName: req.templateName, + expiryDate: req.timestamp.Add(ma.metricsCleanupInterval), } } - - ma.queue = append(ma.queue, annotatedMetric{ - Stats_Metric: m, - username: req.username, - workspaceName: req.workspaceName, - agentName: req.agentName, - templateName: req.templateName, - expiryDate: req.timestamp.Add(ma.metricsCleanupInterval), - }) } - timer.ObserveDuration() + + ma.storeSizeGauge.Set(float64(len(ma.store))) case outputCh := <-ma.collectCh: ma.log.Debug(ctx, "collect metrics") - output := make([]prometheus.Metric, 0, len(ma.queue)) - for _, m := range ma.queue { + output := make([]prometheus.Metric, 0, len(ma.store)) + for _, m := range ma.store { promMetric, err := m.asPrometheus() if err != nil { ma.log.Error(ctx, "can't convert Prometheus value type", slog.F("name", m.Name), slog.F("type", m.Type), slog.F("value", m.Value), slog.Error(err)) @@ -191,29 +231,17 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() { ma.log.Debug(ctx, "clean expired metrics") timer := prometheus.NewTimer(ma.cleanupHistogram) - now := time.Now() - var hasExpiredMetrics bool - for _, m := range ma.queue { - if now.After(m.expiryDate) { - hasExpiredMetrics = true - break + for key, val := range ma.store { + if now.After(val.expiryDate) { + delete(ma.store, key) } } - if hasExpiredMetrics { - fresh := make([]annotatedMetric, 0, len(ma.queue)) - for _, m := range ma.queue { - if m.expiryDate.After(now) { - fresh = append(fresh, m) - } - } - ma.queue = fresh - } - timer.ObserveDuration() cleanupTicker.Reset(ma.metricsCleanupInterval) + ma.storeSizeGauge.Set(float64(len(ma.store))) case <-ctx.Done(): ma.log.Debug(ctx, "metrics aggregator is stopped") diff --git a/coderd/prometheusmetrics/aggregator_internal_test.go b/coderd/prometheusmetrics/aggregator_internal_test.go deleted file mode 100644 index 8830e1b1af..0000000000 --- a/coderd/prometheusmetrics/aggregator_internal_test.go +++ /dev/null @@ -1,210 +0,0 @@ -package prometheusmetrics - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/coder/coder/v2/agent/proto" -) - -func TestAnnotatedMetric_Is(t *testing.T) { - t.Parallel() - am1 := &annotatedMetric{ - Stats_Metric: &proto.Stats_Metric{ - Name: "met", - Type: proto.Stats_Metric_COUNTER, - Value: 1, - Labels: []*proto.Stats_Metric_Label{ - {Name: "rarity", Value: "blue moon"}, - {Name: "certainty", Value: "yes"}, - }, - }, - username: "spike", - workspaceName: "work", - agentName: "janus", - templateName: "tempe", - expiryDate: time.Now(), - } - for _, tc := range []struct { - name string - req updateRequest - m *proto.Stats_Metric - is bool - }{ - { - name: "OK", - req: updateRequest{ - username: "spike", - workspaceName: "work", - agentName: "janus", - templateName: "tempe", - metrics: nil, - timestamp: time.Now().Add(-5 * time.Second), - }, - m: &proto.Stats_Metric{ - Name: "met", - Type: proto.Stats_Metric_COUNTER, - Value: 2, - Labels: []*proto.Stats_Metric_Label{ - {Name: "rarity", Value: "blue moon"}, - {Name: "certainty", Value: "yes"}, - }, - }, - is: true, - }, - { - name: "missingLabel", - req: updateRequest{ - username: "spike", - workspaceName: "work", - agentName: "janus", - templateName: "tempe", - metrics: nil, - timestamp: time.Now().Add(-5 * time.Second), - }, - m: &proto.Stats_Metric{ - Name: "met", - Type: proto.Stats_Metric_COUNTER, - Value: 2, - Labels: []*proto.Stats_Metric_Label{ - {Name: "certainty", Value: "yes"}, - }, - }, - is: false, - }, - { - name: "wrongLabelValue", - req: updateRequest{ - username: "spike", - workspaceName: "work", - agentName: "janus", - templateName: "tempe", - metrics: nil, - timestamp: time.Now().Add(-5 * time.Second), - }, - m: &proto.Stats_Metric{ - Name: "met", - Type: proto.Stats_Metric_COUNTER, - Value: 2, - Labels: []*proto.Stats_Metric_Label{ - {Name: "rarity", Value: "blue moon"}, - {Name: "certainty", Value: "inshallah"}, - }, - }, - is: false, - }, - { - name: "wrongMetricName", - req: updateRequest{ - username: "spike", - workspaceName: "work", - agentName: "janus", - templateName: "tempe", - metrics: nil, - timestamp: time.Now().Add(-5 * time.Second), - }, - m: &proto.Stats_Metric{ - Name: "cub", - Type: proto.Stats_Metric_COUNTER, - Value: 2, - Labels: []*proto.Stats_Metric_Label{ - {Name: "rarity", Value: "blue moon"}, - {Name: "certainty", Value: "yes"}, - }, - }, - is: false, - }, - { - name: "wrongUsername", - req: updateRequest{ - username: "steve", - workspaceName: "work", - agentName: "janus", - templateName: "tempe", - metrics: nil, - timestamp: time.Now().Add(-5 * time.Second), - }, - m: &proto.Stats_Metric{ - Name: "met", - Type: proto.Stats_Metric_COUNTER, - Value: 2, - Labels: []*proto.Stats_Metric_Label{ - {Name: "rarity", Value: "blue moon"}, - {Name: "certainty", Value: "yes"}, - }, - }, - is: false, - }, - { - name: "wrongWorkspaceName", - req: updateRequest{ - username: "spike", - workspaceName: "play", - agentName: "janus", - templateName: "tempe", - metrics: nil, - timestamp: time.Now().Add(-5 * time.Second), - }, - m: &proto.Stats_Metric{ - Name: "met", - Type: proto.Stats_Metric_COUNTER, - Value: 2, - Labels: []*proto.Stats_Metric_Label{ - {Name: "rarity", Value: "blue moon"}, - {Name: "certainty", Value: "yes"}, - }, - }, - is: false, - }, - { - name: "wrongAgentName", - req: updateRequest{ - username: "spike", - workspaceName: "work", - agentName: "bond", - templateName: "tempe", - metrics: nil, - timestamp: time.Now().Add(-5 * time.Second), - }, - m: &proto.Stats_Metric{ - Name: "met", - Type: proto.Stats_Metric_COUNTER, - Value: 2, - Labels: []*proto.Stats_Metric_Label{ - {Name: "rarity", Value: "blue moon"}, - {Name: "certainty", Value: "yes"}, - }, - }, - is: false, - }, - { - name: "wrongTemplateName", - req: updateRequest{ - username: "spike", - workspaceName: "work", - agentName: "janus", - templateName: "phoenix", - metrics: nil, - timestamp: time.Now().Add(-5 * time.Second), - }, - m: &proto.Stats_Metric{ - Name: "met", - Type: proto.Stats_Metric_COUNTER, - Value: 2, - Labels: []*proto.Stats_Metric_Label{ - {Name: "rarity", Value: "blue moon"}, - {Name: "certainty", Value: "yes"}, - }, - }, - is: false, - }, - } { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - require.Equal(t, tc.is, am1.is(tc.req, tc.m)) - }) - } -} diff --git a/coderd/prometheusmetrics/aggregator_test.go b/coderd/prometheusmetrics/aggregator_test.go index 00d088f8b1..bc17dc9be7 100644 --- a/coderd/prometheusmetrics/aggregator_test.go +++ b/coderd/prometheusmetrics/aggregator_test.go @@ -3,6 +3,7 @@ package prometheusmetrics_test import ( "context" "sort" + "strings" "sync/atomic" "testing" "time" @@ -70,6 +71,25 @@ func TestUpdateMetrics_MetricsDoNotExpire(t *testing.T) { {Name: "hello", Value: "world"}, }}, {Name: "d_gauge_four", Type: agentproto.Stats_Metric_GAUGE, Value: 6}, + {Name: "e_gauge_four", Type: agentproto.Stats_Metric_GAUGE, Value: 15, Labels: []*agentproto.Stats_Metric_Label{ + {Name: "foobar", Value: "Foo,ba=z"}, + {Name: "halo", Value: "wor\\,d=1,e=\\,2"}, + {Name: "hello", Value: "wo,,r=d"}, + }}, + {Name: "f_gauge_four", Type: agentproto.Stats_Metric_GAUGE, Value: 6, Labels: []*agentproto.Stats_Metric_Label{ + {Name: "empty", Value: ""}, + {Name: "foobar", Value: "foobaz"}, + }}, + } + + given3 := []*agentproto.Stats_Metric{ + {Name: "e_gauge_four", Type: agentproto.Stats_Metric_GAUGE, Value: 17, Labels: []*agentproto.Stats_Metric_Label{ + {Name: "cat", Value: "do,=g"}, + {Name: "hello", Value: "wo,,rld"}, + }}, + {Name: "f_gauge_four", Type: agentproto.Stats_Metric_GAUGE, Value: 8, Labels: []*agentproto.Stats_Metric_Label{ + {Name: "foobar", Value: "foobaz"}, + }}, } commonLabels := []*agentproto.Stats_Metric_Label{ @@ -80,7 +100,6 @@ func TestUpdateMetrics_MetricsDoNotExpire(t *testing.T) { } expected := []*agentproto.Stats_Metric{ {Name: "a_counter_one", Type: agentproto.Stats_Metric_COUNTER, Value: 1, Labels: commonLabels}, - {Name: "b_counter_two", Type: agentproto.Stats_Metric_COUNTER, Value: 4, Labels: commonLabels}, {Name: "b_counter_two", Type: agentproto.Stats_Metric_COUNTER, Value: -9, Labels: []*agentproto.Stats_Metric_Label{ {Name: "agent_name", Value: testAgentName}, {Name: "lizz", Value: "rizz"}, @@ -88,7 +107,7 @@ func TestUpdateMetrics_MetricsDoNotExpire(t *testing.T) { {Name: "workspace_name", Value: testWorkspaceName}, {Name: "template_name", Value: testTemplateName}, }}, - {Name: "c_gauge_three", Type: agentproto.Stats_Metric_GAUGE, Value: 5, Labels: commonLabels}, + {Name: "b_counter_two", Type: agentproto.Stats_Metric_COUNTER, Value: 4, Labels: commonLabels}, {Name: "c_gauge_three", Type: agentproto.Stats_Metric_GAUGE, Value: 2, Labels: []*agentproto.Stats_Metric_Label{ {Name: "agent_name", Value: testAgentName}, {Name: "foobar", Value: "Foobaz"}, @@ -97,12 +116,38 @@ func TestUpdateMetrics_MetricsDoNotExpire(t *testing.T) { {Name: "workspace_name", Value: testWorkspaceName}, {Name: "template_name", Value: testTemplateName}, }}, + {Name: "c_gauge_three", Type: agentproto.Stats_Metric_GAUGE, Value: 5, Labels: commonLabels}, {Name: "d_gauge_four", Type: agentproto.Stats_Metric_GAUGE, Value: 6, Labels: commonLabels}, + {Name: "e_gauge_four", Type: agentproto.Stats_Metric_GAUGE, Value: 17, Labels: []*agentproto.Stats_Metric_Label{ + {Name: "agent_name", Value: testAgentName}, + {Name: "cat", Value: "do,=g"}, + {Name: "hello", Value: "wo,,rld"}, + {Name: "username", Value: testUsername}, + {Name: "workspace_name", Value: testWorkspaceName}, + {Name: "template_name", Value: testTemplateName}, + }}, + {Name: "e_gauge_four", Type: agentproto.Stats_Metric_GAUGE, Value: 15, Labels: []*agentproto.Stats_Metric_Label{ + {Name: "agent_name", Value: testAgentName}, + {Name: "foobar", Value: "Foo,ba=z"}, + {Name: "halo", Value: "wor\\,d=1,e=\\,2"}, + {Name: "hello", Value: "wo,,r=d"}, + {Name: "username", Value: testUsername}, + {Name: "workspace_name", Value: testWorkspaceName}, + {Name: "template_name", Value: testTemplateName}, + }}, + {Name: "f_gauge_four", Type: agentproto.Stats_Metric_GAUGE, Value: 8, Labels: []*agentproto.Stats_Metric_Label{ + {Name: "agent_name", Value: testAgentName}, + {Name: "foobar", Value: "foobaz"}, + {Name: "username", Value: testUsername}, + {Name: "workspace_name", Value: testWorkspaceName}, + {Name: "template_name", Value: testTemplateName}, + }}, } // when metricsAggregator.Update(ctx, testLabels, given1) metricsAggregator.Update(ctx, testLabels, given2) + metricsAggregator.Update(ctx, testLabels, given3) // then require.Eventually(t, func() bool { @@ -130,6 +175,12 @@ func verifyCollectedMetrics(t *testing.T, expected []*agentproto.Stats_Metric, a return false } + sort.Slice(actual, func(i, j int) bool { + m1 := prometheusMetricToString(t, actual[i]) + m2 := prometheusMetricToString(t, actual[j]) + return m1 < m2 + }) + for i, e := range expected { desc := actual[i].Desc() assert.Contains(t, desc.String(), e.Name) @@ -156,9 +207,39 @@ func verifyCollectedMetrics(t *testing.T, expected []*agentproto.Stats_Metric, a return true } +func prometheusMetricToString(t *testing.T, m prometheus.Metric) string { + var sb strings.Builder + + desc := m.Desc() + _, _ = sb.WriteString(desc.String()) + _ = sb.WriteByte('|') + + var d dto.Metric + err := m.Write(&d) + require.NoError(t, err) + dtoLabels := asMetricAgentLabels(d.GetLabel()) + sort.Slice(dtoLabels, func(i, j int) bool { + return dtoLabels[i].Name < dtoLabels[j].Name + }) + + for _, dtoLabel := range dtoLabels { + if dtoLabel.Value == "" { + continue + } + _, _ = sb.WriteString(dtoLabel.Name) + _ = sb.WriteByte('=') + _, _ = sb.WriteString(prometheusmetrics.MetricLabelValueEncoder.Replace(dtoLabel.Value)) + } + return strings.TrimRight(sb.String(), ",") +} + func asMetricAgentLabels(dtoLabels []*dto.LabelPair) []*agentproto.Stats_Metric_Label { metricLabels := make([]*agentproto.Stats_Metric_Label, 0, len(dtoLabels)) for _, dtoLabel := range dtoLabels { + if dtoLabel.GetValue() == "" { + continue + } + metricLabels = append(metricLabels, &agentproto.Stats_Metric_Label{ Name: dtoLabel.GetName(), Value: dtoLabel.GetValue(),