mirror of https://github.com/coder/coder.git
feat: Implement aggregator for agent metrics (#7259)
This commit is contained in:
parent
b6666cf1cf
commit
bb0a38b161
|
@ -16,7 +16,6 @@ import (
|
|||
"os"
|
||||
"os/user"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -1236,11 +1235,11 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) {
|
|||
// Convert from microseconds to milliseconds.
|
||||
stats.ConnectionMedianLatencyMS /= 1000
|
||||
|
||||
lastStat := a.latestStat.Load()
|
||||
if lastStat != nil && reflect.DeepEqual(lastStat, stats) {
|
||||
a.logger.Info(ctx, "skipping stat because nothing changed")
|
||||
return
|
||||
}
|
||||
// Collect agent metrics.
|
||||
// Agent metrics are changing all the time, so there is no need to perform
|
||||
// reflect.DeepEqual to see if stats should be transferred.
|
||||
stats.Metrics = collectMetrics()
|
||||
|
||||
a.latestStat.Store(stats)
|
||||
|
||||
select {
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"tailscale.com/util/clientmetric"
|
||||
|
||||
"github.com/coder/coder/codersdk/agentsdk"
|
||||
)
|
||||
|
||||
func collectMetrics() []agentsdk.AgentMetric {
|
||||
// Tailscale metrics
|
||||
metrics := clientmetric.Metrics()
|
||||
collected := make([]agentsdk.AgentMetric, 0, len(metrics))
|
||||
for _, m := range metrics {
|
||||
if isIgnoredMetric(m.Name()) {
|
||||
continue
|
||||
}
|
||||
|
||||
collected = append(collected, agentsdk.AgentMetric{
|
||||
Name: m.Name(),
|
||||
Type: asMetricType(m.Type()),
|
||||
Value: float64(m.Value()),
|
||||
})
|
||||
}
|
||||
return collected
|
||||
}
|
||||
|
||||
// isIgnoredMetric checks if the metric should be ignored, as Coder agent doesn't use related features.
|
||||
// Expected metric families: magicsock_*, derp_*, tstun_*, netcheck_*, portmap_*, etc.
|
||||
func isIgnoredMetric(metricName string) bool {
|
||||
if strings.HasPrefix(metricName, "dns_") ||
|
||||
strings.HasPrefix(metricName, "controlclient_") ||
|
||||
strings.HasPrefix(metricName, "peerapi_") ||
|
||||
strings.HasPrefix(metricName, "profiles_") ||
|
||||
strings.HasPrefix(metricName, "tstun_") {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func asMetricType(typ clientmetric.Type) agentsdk.AgentMetricType {
|
||||
switch typ {
|
||||
case clientmetric.TypeGauge:
|
||||
return agentsdk.AgentMetricTypeGauge
|
||||
case clientmetric.TypeCounter:
|
||||
return agentsdk.AgentMetricTypeCounter
|
||||
default:
|
||||
panic(fmt.Sprintf("unknown metric type: %d", typ))
|
||||
}
|
||||
}
|
|
@ -723,6 +723,20 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
|
|||
return xerrors.Errorf("register agent stats prometheus metric: %w", err)
|
||||
}
|
||||
defer closeAgentStatsFunc()
|
||||
|
||||
metricsAggregator, err := prometheusmetrics.NewMetricsAggregator(logger, options.PrometheusRegistry, 0)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("can't initialize metrics aggregator: %w", err)
|
||||
}
|
||||
|
||||
cancelMetricsAggregator := metricsAggregator.Run(ctx)
|
||||
defer cancelMetricsAggregator()
|
||||
|
||||
options.UpdateAgentMetrics = metricsAggregator.Update
|
||||
err = options.PrometheusRegistry.Register(metricsAggregator)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("can't register metrics aggregator as collector: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
//nolint:revive
|
||||
|
|
|
@ -5655,6 +5655,44 @@ const docTemplate = `{
|
|||
}
|
||||
}
|
||||
},
|
||||
"agentsdk.AgentMetric": {
|
||||
"type": "object",
|
||||
"required": [
|
||||
"name",
|
||||
"type",
|
||||
"value"
|
||||
],
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"counter",
|
||||
"gauge"
|
||||
],
|
||||
"allOf": [
|
||||
{
|
||||
"$ref": "#/definitions/agentsdk.AgentMetricType"
|
||||
}
|
||||
]
|
||||
},
|
||||
"value": {
|
||||
"type": "number"
|
||||
}
|
||||
}
|
||||
},
|
||||
"agentsdk.AgentMetricType": {
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"counter",
|
||||
"gauge"
|
||||
],
|
||||
"x-enum-varnames": [
|
||||
"AgentMetricTypeCounter",
|
||||
"AgentMetricTypeGauge"
|
||||
]
|
||||
},
|
||||
"agentsdk.AuthenticateResponse": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
@ -5858,6 +5896,13 @@ const docTemplate = `{
|
|||
"type": "integer"
|
||||
}
|
||||
},
|
||||
"metrics": {
|
||||
"description": "Metrics collected by the agent",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/definitions/agentsdk.AgentMetric"
|
||||
}
|
||||
},
|
||||
"rx_bytes": {
|
||||
"description": "RxBytes is the number of received bytes.",
|
||||
"type": "integer"
|
||||
|
|
|
@ -4979,6 +4979,31 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
"agentsdk.AgentMetric": {
|
||||
"type": "object",
|
||||
"required": ["name", "type", "value"],
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string"
|
||||
},
|
||||
"type": {
|
||||
"enum": ["counter", "gauge"],
|
||||
"allOf": [
|
||||
{
|
||||
"$ref": "#/definitions/agentsdk.AgentMetricType"
|
||||
}
|
||||
]
|
||||
},
|
||||
"value": {
|
||||
"type": "number"
|
||||
}
|
||||
}
|
||||
},
|
||||
"agentsdk.AgentMetricType": {
|
||||
"type": "string",
|
||||
"enum": ["counter", "gauge"],
|
||||
"x-enum-varnames": ["AgentMetricTypeCounter", "AgentMetricTypeGauge"]
|
||||
},
|
||||
"agentsdk.AuthenticateResponse": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
@ -5177,6 +5202,13 @@
|
|||
"type": "integer"
|
||||
}
|
||||
},
|
||||
"metrics": {
|
||||
"description": "Metrics collected by the agent",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/definitions/agentsdk.AgentMetric"
|
||||
}
|
||||
},
|
||||
"rx_bytes": {
|
||||
"description": "RxBytes is the number of received bytes.",
|
||||
"type": "integer"
|
||||
|
|
|
@ -38,6 +38,8 @@ import (
|
|||
"cdr.dev/slog"
|
||||
|
||||
"github.com/coder/coder/buildinfo"
|
||||
"github.com/coder/coder/codersdk/agentsdk"
|
||||
|
||||
// Used for swagger docs.
|
||||
_ "github.com/coder/coder/coderd/apidoc"
|
||||
"github.com/coder/coder/coderd/audit"
|
||||
|
@ -146,6 +148,8 @@ type Options struct {
|
|||
SSHConfig codersdk.SSHConfigResponse
|
||||
|
||||
HTTPClient *http.Client
|
||||
|
||||
UpdateAgentMetrics func(ctx context.Context, username, workspaceName, agentName string, metrics []agentsdk.AgentMetric)
|
||||
}
|
||||
|
||||
// @title Coder API
|
||||
|
|
|
@ -0,0 +1,250 @@
|
|||
package prometheusmetrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"cdr.dev/slog"
|
||||
|
||||
"github.com/coder/coder/codersdk/agentsdk"
|
||||
)
|
||||
|
||||
const (
|
||||
// MetricHelpForAgent is a help string that replaces all agent metric help
|
||||
// messages. This is because a registry cannot have conflicting
|
||||
// help messages for the same metric in a "gather". If our coder agents are
|
||||
// on different versions, this is a possible scenario.
|
||||
metricHelpForAgent = "Metrics are forwarded from workspace agents connected to this instance of coderd."
|
||||
)
|
||||
|
||||
const (
|
||||
sizeCollectCh = 10
|
||||
sizeUpdateCh = 1024
|
||||
|
||||
defaultMetricsCleanupInterval = 2 * time.Minute
|
||||
)
|
||||
|
||||
type MetricsAggregator struct {
|
||||
queue []annotatedMetric
|
||||
|
||||
log slog.Logger
|
||||
metricsCleanupInterval time.Duration
|
||||
|
||||
collectCh chan (chan []prometheus.Metric)
|
||||
updateCh chan updateRequest
|
||||
|
||||
updateHistogram prometheus.Histogram
|
||||
cleanupHistogram prometheus.Histogram
|
||||
}
|
||||
|
||||
type updateRequest struct {
|
||||
username string
|
||||
workspaceName string
|
||||
agentName string
|
||||
|
||||
metrics []agentsdk.AgentMetric
|
||||
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
type annotatedMetric struct {
|
||||
agentsdk.AgentMetric
|
||||
|
||||
username string
|
||||
workspaceName string
|
||||
agentName string
|
||||
|
||||
expiryDate time.Time
|
||||
}
|
||||
|
||||
var _ prometheus.Collector = new(MetricsAggregator)
|
||||
|
||||
func NewMetricsAggregator(logger slog.Logger, registerer prometheus.Registerer, duration time.Duration) (*MetricsAggregator, error) {
|
||||
metricsCleanupInterval := defaultMetricsCleanupInterval
|
||||
if duration > 0 {
|
||||
metricsCleanupInterval = duration
|
||||
}
|
||||
|
||||
updateHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: "coderd",
|
||||
Subsystem: "prometheusmetrics",
|
||||
Name: "metrics_aggregator_execution_update_seconds",
|
||||
Help: "Histogram for duration of metrics aggregator update in seconds.",
|
||||
Buckets: []float64{0.001, 0.005, 0.010, 0.025, 0.050, 0.100, 0.500, 1, 5, 10, 30},
|
||||
})
|
||||
err := registerer.Register(updateHistogram)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cleanupHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: "coderd",
|
||||
Subsystem: "prometheusmetrics",
|
||||
Name: "metrics_aggregator_execution_cleanup_seconds",
|
||||
Help: "Histogram for duration of metrics aggregator cleanup in seconds.",
|
||||
Buckets: []float64{0.001, 0.005, 0.010, 0.025, 0.050, 0.100, 0.500, 1, 5, 10, 30},
|
||||
})
|
||||
err = registerer.Register(cleanupHistogram)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &MetricsAggregator{
|
||||
log: logger,
|
||||
metricsCleanupInterval: metricsCleanupInterval,
|
||||
|
||||
collectCh: make(chan (chan []prometheus.Metric), sizeCollectCh),
|
||||
updateCh: make(chan updateRequest, sizeUpdateCh),
|
||||
|
||||
updateHistogram: updateHistogram,
|
||||
cleanupHistogram: cleanupHistogram,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (ma *MetricsAggregator) Run(ctx context.Context) func() {
|
||||
ctx, cancelFunc := context.WithCancel(ctx)
|
||||
done := make(chan struct{})
|
||||
|
||||
cleanupTicker := time.NewTicker(ma.metricsCleanupInterval)
|
||||
go func() {
|
||||
defer close(done)
|
||||
defer cleanupTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case req := <-ma.updateCh:
|
||||
ma.log.Debug(ctx, "metrics aggregator: update metrics")
|
||||
|
||||
timer := prometheus.NewTimer(ma.updateHistogram)
|
||||
UpdateLoop:
|
||||
for _, m := range req.metrics {
|
||||
for i, q := range ma.queue {
|
||||
if q.username == req.username && q.workspaceName == req.workspaceName && q.agentName == req.agentName && q.Name == m.Name {
|
||||
ma.queue[i].AgentMetric.Value = m.Value
|
||||
ma.queue[i].expiryDate = req.timestamp.Add(ma.metricsCleanupInterval)
|
||||
continue UpdateLoop
|
||||
}
|
||||
}
|
||||
|
||||
ma.queue = append(ma.queue, annotatedMetric{
|
||||
username: req.username,
|
||||
workspaceName: req.workspaceName,
|
||||
agentName: req.agentName,
|
||||
|
||||
AgentMetric: m,
|
||||
|
||||
expiryDate: req.timestamp.Add(ma.metricsCleanupInterval),
|
||||
})
|
||||
}
|
||||
|
||||
timer.ObserveDuration()
|
||||
case outputCh := <-ma.collectCh:
|
||||
ma.log.Debug(ctx, "metrics aggregator: collect metrics")
|
||||
|
||||
output := make([]prometheus.Metric, 0, len(ma.queue))
|
||||
for _, m := range ma.queue {
|
||||
desc := prometheus.NewDesc(m.Name, metricHelpForAgent, agentMetricsLabels, nil)
|
||||
valueType, err := asPrometheusValueType(m.Type)
|
||||
if err != nil {
|
||||
ma.log.Error(ctx, "can't convert Prometheus value type", slog.F("name", m.Name), slog.F("type", m.Type), slog.F("value", m.Value), slog.Error(err))
|
||||
continue
|
||||
}
|
||||
constMetric := prometheus.MustNewConstMetric(desc, valueType, m.Value, m.username, m.workspaceName, m.agentName)
|
||||
output = append(output, constMetric)
|
||||
}
|
||||
outputCh <- output
|
||||
close(outputCh)
|
||||
case <-cleanupTicker.C:
|
||||
ma.log.Debug(ctx, "metrics aggregator: clean expired metrics")
|
||||
|
||||
timer := prometheus.NewTimer(ma.cleanupHistogram)
|
||||
|
||||
now := time.Now()
|
||||
|
||||
var hasExpiredMetrics bool
|
||||
for _, m := range ma.queue {
|
||||
if now.After(m.expiryDate) {
|
||||
hasExpiredMetrics = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if hasExpiredMetrics {
|
||||
fresh := make([]annotatedMetric, 0, len(ma.queue))
|
||||
for _, m := range ma.queue {
|
||||
if m.expiryDate.After(now) {
|
||||
fresh = append(fresh, m)
|
||||
}
|
||||
}
|
||||
ma.queue = fresh
|
||||
}
|
||||
|
||||
timer.ObserveDuration()
|
||||
cleanupTicker.Reset(ma.metricsCleanupInterval)
|
||||
|
||||
case <-ctx.Done():
|
||||
ma.log.Debug(ctx, "metrics aggregator: is stopped")
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
return func() {
|
||||
cancelFunc()
|
||||
<-done
|
||||
}
|
||||
}
|
||||
|
||||
// Describe function does not have any knowledge about the metrics schema,
|
||||
// so it does not emit anything.
|
||||
func (*MetricsAggregator) Describe(_ chan<- *prometheus.Desc) {
|
||||
}
|
||||
|
||||
var agentMetricsLabels = []string{usernameLabel, workspaceNameLabel, agentNameLabel}
|
||||
|
||||
func (ma *MetricsAggregator) Collect(ch chan<- prometheus.Metric) {
|
||||
output := make(chan []prometheus.Metric, 1)
|
||||
|
||||
select {
|
||||
case ma.collectCh <- output:
|
||||
default:
|
||||
ma.log.Error(context.Background(), "metrics aggregator: collect queue is full")
|
||||
return
|
||||
}
|
||||
|
||||
for s := range output {
|
||||
for _, m := range s {
|
||||
ch <- m
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ma *MetricsAggregator) Update(ctx context.Context, username, workspaceName, agentName string, metrics []agentsdk.AgentMetric) {
|
||||
select {
|
||||
case ma.updateCh <- updateRequest{
|
||||
username: username,
|
||||
workspaceName: workspaceName,
|
||||
agentName: agentName,
|
||||
metrics: metrics,
|
||||
|
||||
timestamp: time.Now(),
|
||||
}:
|
||||
case <-ctx.Done():
|
||||
ma.log.Debug(ctx, "metrics aggregator: update request is canceled")
|
||||
default:
|
||||
ma.log.Error(ctx, "metrics aggregator: update queue is full")
|
||||
}
|
||||
}
|
||||
|
||||
func asPrometheusValueType(metricType agentsdk.AgentMetricType) (prometheus.ValueType, error) {
|
||||
switch metricType {
|
||||
case agentsdk.AgentMetricTypeGauge:
|
||||
return prometheus.GaugeValue, nil
|
||||
case agentsdk.AgentMetricTypeCounter:
|
||||
return prometheus.CounterValue, nil
|
||||
default:
|
||||
return -1, xerrors.Errorf("unsupported value type: %s", metricType)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,154 @@
|
|||
package prometheusmetrics_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"cdr.dev/slog/sloggers/slogtest"
|
||||
"github.com/coder/coder/coderd/prometheusmetrics"
|
||||
"github.com/coder/coder/codersdk/agentsdk"
|
||||
"github.com/coder/coder/testutil"
|
||||
)
|
||||
|
||||
const (
|
||||
testWorkspaceName = "yogi-workspace"
|
||||
testUsername = "yogi-bear"
|
||||
testAgentName = "main-agent"
|
||||
)
|
||||
|
||||
func TestUpdateMetrics_MetricsDoNotExpire(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// given
|
||||
registry := prometheus.NewRegistry()
|
||||
metricsAggregator, err := prometheusmetrics.NewMetricsAggregator(slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}), registry, time.Hour) // time.Hour, so metrics won't expire
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancelFunc)
|
||||
|
||||
closeFunc := metricsAggregator.Run(ctx)
|
||||
t.Cleanup(closeFunc)
|
||||
|
||||
given1 := []agentsdk.AgentMetric{
|
||||
{Name: "a_counter_one", Type: agentsdk.AgentMetricTypeCounter, Value: 1},
|
||||
{Name: "b_counter_two", Type: agentsdk.AgentMetricTypeCounter, Value: 2},
|
||||
{Name: "c_gauge_three", Type: agentsdk.AgentMetricTypeGauge, Value: 3},
|
||||
}
|
||||
|
||||
given2 := []agentsdk.AgentMetric{
|
||||
{Name: "b_counter_two", Type: agentsdk.AgentMetricTypeCounter, Value: 4},
|
||||
{Name: "d_gauge_four", Type: agentsdk.AgentMetricTypeGauge, Value: 6},
|
||||
}
|
||||
|
||||
expected := []agentsdk.AgentMetric{
|
||||
{Name: "a_counter_one", Type: agentsdk.AgentMetricTypeCounter, Value: 1},
|
||||
{Name: "b_counter_two", Type: agentsdk.AgentMetricTypeCounter, Value: 4},
|
||||
{Name: "c_gauge_three", Type: agentsdk.AgentMetricTypeGauge, Value: 3},
|
||||
{Name: "d_gauge_four", Type: agentsdk.AgentMetricTypeGauge, Value: 6},
|
||||
}
|
||||
|
||||
// when
|
||||
metricsAggregator.Update(ctx, testUsername, testWorkspaceName, testAgentName, given1)
|
||||
metricsAggregator.Update(ctx, testUsername, testWorkspaceName, testAgentName, given2)
|
||||
|
||||
// then
|
||||
require.Eventually(t, func() bool {
|
||||
var actual []prometheus.Metric
|
||||
metricsCh := make(chan prometheus.Metric)
|
||||
|
||||
done := make(chan struct{}, 1)
|
||||
defer close(done)
|
||||
go func() {
|
||||
for m := range metricsCh {
|
||||
actual = append(actual, m)
|
||||
}
|
||||
done <- struct{}{}
|
||||
}()
|
||||
metricsAggregator.Collect(metricsCh)
|
||||
close(metricsCh)
|
||||
<-done
|
||||
return verifyCollectedMetrics(t, expected, actual)
|
||||
}, testutil.WaitMedium, testutil.IntervalSlow)
|
||||
}
|
||||
|
||||
func verifyCollectedMetrics(t *testing.T, expected []agentsdk.AgentMetric, actual []prometheus.Metric) bool {
|
||||
if len(expected) != len(actual) {
|
||||
return false
|
||||
}
|
||||
|
||||
// Metrics are expected to arrive in order
|
||||
for i, e := range expected {
|
||||
desc := actual[i].Desc()
|
||||
assert.Contains(t, desc.String(), e.Name)
|
||||
|
||||
var d dto.Metric
|
||||
err := actual[i].Write(&d)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, "agent_name", *d.Label[0].Name)
|
||||
require.Equal(t, testAgentName, *d.Label[0].Value)
|
||||
require.Equal(t, "username", *d.Label[1].Name)
|
||||
require.Equal(t, testUsername, *d.Label[1].Value)
|
||||
require.Equal(t, "workspace_name", *d.Label[2].Name)
|
||||
require.Equal(t, testWorkspaceName, *d.Label[2].Value)
|
||||
|
||||
if e.Type == agentsdk.AgentMetricTypeCounter {
|
||||
require.Equal(t, e.Value, *d.Counter.Value)
|
||||
} else if e.Type == agentsdk.AgentMetricTypeGauge {
|
||||
require.Equal(t, e.Value, *d.Gauge.Value)
|
||||
} else {
|
||||
require.Failf(t, "unsupported type: %s", string(e.Type))
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func TestUpdateMetrics_MetricsExpire(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// given
|
||||
registry := prometheus.NewRegistry()
|
||||
metricsAggregator, err := prometheusmetrics.NewMetricsAggregator(slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}), registry, time.Millisecond)
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancelFunc)
|
||||
|
||||
closeFunc := metricsAggregator.Run(ctx)
|
||||
t.Cleanup(closeFunc)
|
||||
|
||||
given := []agentsdk.AgentMetric{
|
||||
{Name: "a_counter_one", Type: agentsdk.AgentMetricTypeCounter, Value: 1},
|
||||
}
|
||||
|
||||
// when
|
||||
metricsAggregator.Update(ctx, testUsername, testWorkspaceName, testAgentName, given)
|
||||
|
||||
time.Sleep(time.Millisecond * 10) // Ensure that metric is expired
|
||||
|
||||
// then
|
||||
require.Eventually(t, func() bool {
|
||||
var actual []prometheus.Metric
|
||||
metricsCh := make(chan prometheus.Metric)
|
||||
|
||||
done := make(chan struct{}, 1)
|
||||
defer close(done)
|
||||
go func() {
|
||||
for m := range metricsCh {
|
||||
actual = append(actual, m)
|
||||
}
|
||||
done <- struct{}{}
|
||||
}()
|
||||
metricsAggregator.Collect(metricsCh)
|
||||
close(metricsCh)
|
||||
<-done
|
||||
return len(actual) == 0
|
||||
}, testutil.WaitShort, testutil.IntervalFast)
|
||||
}
|
|
@ -22,6 +22,12 @@ import (
|
|||
"github.com/coder/coder/tailnet"
|
||||
)
|
||||
|
||||
const (
|
||||
agentNameLabel = "agent_name"
|
||||
usernameLabel = "username"
|
||||
workspaceNameLabel = "workspace_name"
|
||||
)
|
||||
|
||||
// ActiveUsers tracks the number of users that have authenticated within the past hour.
|
||||
func ActiveUsers(ctx context.Context, registerer prometheus.Registerer, db database.Store, duration time.Duration) (func(), error) {
|
||||
if duration == 0 {
|
||||
|
@ -140,7 +146,7 @@ func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Regis
|
|||
Subsystem: "agents",
|
||||
Name: "up",
|
||||
Help: "The number of active agents per workspace.",
|
||||
}, []string{"username", "workspace_name"}))
|
||||
}, []string{usernameLabel, workspaceNameLabel}))
|
||||
err := registerer.Register(agentsGauge)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -151,7 +157,7 @@ func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Regis
|
|||
Subsystem: "agents",
|
||||
Name: "connections",
|
||||
Help: "Agent connections with statuses.",
|
||||
}, []string{"agent_name", "username", "workspace_name", "status", "lifecycle_state", "tailnet_node"}))
|
||||
}, []string{agentNameLabel, usernameLabel, workspaceNameLabel, "status", "lifecycle_state", "tailnet_node"}))
|
||||
err = registerer.Register(agentsConnectionsGauge)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -162,7 +168,7 @@ func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Regis
|
|||
Subsystem: "agents",
|
||||
Name: "connection_latencies_seconds",
|
||||
Help: "Agent connection latencies in seconds.",
|
||||
}, []string{"agent_name", "username", "workspace_name", "derp_region", "preferred"}))
|
||||
}, []string{agentNameLabel, usernameLabel, workspaceNameLabel, "derp_region", "preferred"}))
|
||||
err = registerer.Register(agentsConnectionLatenciesGauge)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -173,7 +179,7 @@ func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Regis
|
|||
Subsystem: "agents",
|
||||
Name: "apps",
|
||||
Help: "Agent applications with statuses.",
|
||||
}, []string{"agent_name", "username", "workspace_name", "app_name", "health"}))
|
||||
}, []string{agentNameLabel, usernameLabel, workspaceNameLabel, "app_name", "health"}))
|
||||
err = registerer.Register(agentsAppsGauge)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -333,7 +339,7 @@ func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.R
|
|||
Subsystem: "agentstats",
|
||||
Name: "tx_bytes",
|
||||
Help: "Agent Tx bytes",
|
||||
}, []string{"agent_name", "username", "workspace_name"}))
|
||||
}, []string{agentNameLabel, usernameLabel, workspaceNameLabel}))
|
||||
err = registerer.Register(agentStatsTxBytesGauge)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -344,7 +350,7 @@ func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.R
|
|||
Subsystem: "agentstats",
|
||||
Name: "rx_bytes",
|
||||
Help: "Agent Rx bytes",
|
||||
}, []string{"agent_name", "username", "workspace_name"}))
|
||||
}, []string{agentNameLabel, usernameLabel, workspaceNameLabel}))
|
||||
err = registerer.Register(agentStatsRxBytesGauge)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -355,7 +361,7 @@ func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.R
|
|||
Subsystem: "agentstats",
|
||||
Name: "connection_count",
|
||||
Help: "The number of established connections by agent",
|
||||
}, []string{"agent_name", "username", "workspace_name"}))
|
||||
}, []string{agentNameLabel, usernameLabel, workspaceNameLabel}))
|
||||
err = registerer.Register(agentStatsConnectionCountGauge)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -366,7 +372,7 @@ func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.R
|
|||
Subsystem: "agentstats",
|
||||
Name: "connection_median_latency_seconds",
|
||||
Help: "The median agent connection latency in seconds",
|
||||
}, []string{"agent_name", "username", "workspace_name"}))
|
||||
}, []string{agentNameLabel, usernameLabel, workspaceNameLabel}))
|
||||
err = registerer.Register(agentStatsConnectionMedianLatencyGauge)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -377,7 +383,7 @@ func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.R
|
|||
Subsystem: "agentstats",
|
||||
Name: "session_count_jetbrains",
|
||||
Help: "The number of session established by JetBrains",
|
||||
}, []string{"agent_name", "username", "workspace_name"}))
|
||||
}, []string{agentNameLabel, usernameLabel, workspaceNameLabel}))
|
||||
err = registerer.Register(agentStatsSessionCountJetBrainsGauge)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -388,7 +394,7 @@ func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.R
|
|||
Subsystem: "agentstats",
|
||||
Name: "session_count_reconnecting_pty",
|
||||
Help: "The number of session established by reconnecting PTY",
|
||||
}, []string{"agent_name", "username", "workspace_name"}))
|
||||
}, []string{agentNameLabel, usernameLabel, workspaceNameLabel}))
|
||||
err = registerer.Register(agentStatsSessionCountReconnectingPTYGauge)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -399,7 +405,7 @@ func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.R
|
|||
Subsystem: "agentstats",
|
||||
Name: "session_count_ssh",
|
||||
Help: "The number of session established by SSH",
|
||||
}, []string{"agent_name", "username", "workspace_name"}))
|
||||
}, []string{agentNameLabel, usernameLabel, workspaceNameLabel}))
|
||||
err = registerer.Register(agentStatsSessionCountSSHGauge)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -410,7 +416,7 @@ func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.R
|
|||
Subsystem: "agentstats",
|
||||
Name: "session_count_vscode",
|
||||
Help: "The number of session established by VSCode",
|
||||
}, []string{"agent_name", "username", "workspace_name"}))
|
||||
}, []string{agentNameLabel, usernameLabel, workspaceNameLabel}))
|
||||
err = registerer.Register(agentStatsSessionCountVSCodeGauge)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"github.com/google/uuid"
|
||||
"golang.org/x/exp/slices"
|
||||
"golang.org/x/mod/semver"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/xerrors"
|
||||
"nhooyr.io/websocket"
|
||||
"tailscale.com/tailcfg"
|
||||
|
@ -258,19 +259,19 @@ func (api *API) patchWorkspaceAgentStartupLogs(rw http.ResponseWriter, r *http.R
|
|||
output := make([]string, 0)
|
||||
level := make([]database.LogLevel, 0)
|
||||
outputLength := 0
|
||||
for _, log := range req.Logs {
|
||||
createdAt = append(createdAt, log.CreatedAt)
|
||||
output = append(output, log.Output)
|
||||
outputLength += len(log.Output)
|
||||
if log.Level == "" {
|
||||
for _, logEntry := range req.Logs {
|
||||
createdAt = append(createdAt, logEntry.CreatedAt)
|
||||
output = append(output, logEntry.Output)
|
||||
outputLength += len(logEntry.Output)
|
||||
if logEntry.Level == "" {
|
||||
// Default to "info" to support older agents that didn't have the level field.
|
||||
log.Level = codersdk.LogLevelInfo
|
||||
logEntry.Level = codersdk.LogLevelInfo
|
||||
}
|
||||
parsedLevel := database.LogLevel(log.Level)
|
||||
parsedLevel := database.LogLevel(logEntry.Level)
|
||||
if !parsedLevel.Valid() {
|
||||
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
||||
Message: "Invalid log level provided.",
|
||||
Detail: fmt.Sprintf("invalid log level: %q", log.Level),
|
||||
Detail: fmt.Sprintf("invalid log level: %q", logEntry.Level),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
@ -1213,39 +1214,58 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques
|
|||
}
|
||||
|
||||
now := database.Now()
|
||||
_, err = api.Database.InsertWorkspaceAgentStat(ctx, database.InsertWorkspaceAgentStatParams{
|
||||
ID: uuid.New(),
|
||||
CreatedAt: now,
|
||||
AgentID: workspaceAgent.ID,
|
||||
WorkspaceID: workspace.ID,
|
||||
UserID: workspace.OwnerID,
|
||||
TemplateID: workspace.TemplateID,
|
||||
ConnectionsByProto: payload,
|
||||
ConnectionCount: req.ConnectionCount,
|
||||
RxPackets: req.RxPackets,
|
||||
RxBytes: req.RxBytes,
|
||||
TxPackets: req.TxPackets,
|
||||
TxBytes: req.TxBytes,
|
||||
SessionCountVSCode: req.SessionCountVSCode,
|
||||
SessionCountJetBrains: req.SessionCountJetBrains,
|
||||
SessionCountReconnectingPTY: req.SessionCountReconnectingPTY,
|
||||
SessionCountSSH: req.SessionCountSSH,
|
||||
ConnectionMedianLatencyMS: req.ConnectionMedianLatencyMS,
|
||||
})
|
||||
if err != nil {
|
||||
httpapi.InternalServerError(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
if req.ConnectionCount > 0 {
|
||||
err = api.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{
|
||||
var errGroup errgroup.Group
|
||||
errGroup.Go(func() error {
|
||||
_, err = api.Database.InsertWorkspaceAgentStat(ctx, database.InsertWorkspaceAgentStatParams{
|
||||
ID: uuid.New(),
|
||||
CreatedAt: now,
|
||||
AgentID: workspaceAgent.ID,
|
||||
WorkspaceID: workspace.ID,
|
||||
UserID: workspace.OwnerID,
|
||||
TemplateID: workspace.TemplateID,
|
||||
ConnectionsByProto: payload,
|
||||
ConnectionCount: req.ConnectionCount,
|
||||
RxPackets: req.RxPackets,
|
||||
RxBytes: req.RxBytes,
|
||||
TxPackets: req.TxPackets,
|
||||
TxBytes: req.TxBytes,
|
||||
SessionCountVSCode: req.SessionCountVSCode,
|
||||
SessionCountJetBrains: req.SessionCountJetBrains,
|
||||
SessionCountReconnectingPTY: req.SessionCountReconnectingPTY,
|
||||
SessionCountSSH: req.SessionCountSSH,
|
||||
ConnectionMedianLatencyMS: req.ConnectionMedianLatencyMS,
|
||||
})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("can't insert workspace agent stat: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
errGroup.Go(func() error {
|
||||
err := api.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{
|
||||
ID: workspace.ID,
|
||||
LastUsedAt: now,
|
||||
})
|
||||
if err != nil {
|
||||
httpapi.InternalServerError(rw, err)
|
||||
return
|
||||
return xerrors.Errorf("can't update workspace LastUsedAt: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if api.Options.UpdateAgentMetrics != nil {
|
||||
errGroup.Go(func() error {
|
||||
user, err := api.Database.GetUserByID(ctx, workspace.OwnerID)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("can't get user: %w", err)
|
||||
}
|
||||
|
||||
api.Options.UpdateAgentMetrics(ctx, user.Username, workspace.Name, workspaceAgent.Name, req.Metrics)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
err = errGroup.Wait()
|
||||
if err != nil {
|
||||
httpapi.InternalServerError(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
httpapi.Write(ctx, rw, http.StatusOK, agentsdk.StatsResponse{
|
||||
|
@ -1973,17 +1993,17 @@ func websocketNetConn(ctx context.Context, conn *websocket.Conn, msgType websock
|
|||
|
||||
func convertWorkspaceAgentStartupLogs(logs []database.WorkspaceAgentStartupLog) []codersdk.WorkspaceAgentStartupLog {
|
||||
sdk := make([]codersdk.WorkspaceAgentStartupLog, 0, len(logs))
|
||||
for _, log := range logs {
|
||||
sdk = append(sdk, convertWorkspaceAgentStartupLog(log))
|
||||
for _, logEntry := range logs {
|
||||
sdk = append(sdk, convertWorkspaceAgentStartupLog(logEntry))
|
||||
}
|
||||
return sdk
|
||||
}
|
||||
|
||||
func convertWorkspaceAgentStartupLog(log database.WorkspaceAgentStartupLog) codersdk.WorkspaceAgentStartupLog {
|
||||
func convertWorkspaceAgentStartupLog(logEntry database.WorkspaceAgentStartupLog) codersdk.WorkspaceAgentStartupLog {
|
||||
return codersdk.WorkspaceAgentStartupLog{
|
||||
ID: log.ID,
|
||||
CreatedAt: log.CreatedAt,
|
||||
Output: log.Output,
|
||||
Level: codersdk.LogLevel(log.Level),
|
||||
ID: logEntry.ID,
|
||||
CreatedAt: logEntry.CreatedAt,
|
||||
Output: logEntry.Output,
|
||||
Level: codersdk.LogLevel(logEntry.Level),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -483,6 +483,22 @@ type Stats struct {
|
|||
// SessionCountSSH is the number of connections received by an agent
|
||||
// that are normal, non-tagged SSH sessions.
|
||||
SessionCountSSH int64 `json:"session_count_ssh"`
|
||||
|
||||
// Metrics collected by the agent
|
||||
Metrics []AgentMetric `json:"metrics"`
|
||||
}
|
||||
|
||||
type AgentMetricType string
|
||||
|
||||
const (
|
||||
AgentMetricTypeCounter AgentMetricType = "counter"
|
||||
AgentMetricTypeGauge AgentMetricType = "gauge"
|
||||
)
|
||||
|
||||
type AgentMetric struct {
|
||||
Name string `json:"name" validate:"required"`
|
||||
Type AgentMetricType `json:"type" validate:"required" enums:"counter,gauge"`
|
||||
Value float64 `json:"value" validate:"required"`
|
||||
}
|
||||
|
||||
type StatsResponse struct {
|
||||
|
|
|
@ -16,6 +16,46 @@
|
|||
| `document` | string | true | | |
|
||||
| `signature` | string | true | | |
|
||||
|
||||
## agentsdk.AgentMetric
|
||||
|
||||
```json
|
||||
{
|
||||
"name": "string",
|
||||
"type": "counter",
|
||||
"value": 0
|
||||
}
|
||||
```
|
||||
|
||||
### Properties
|
||||
|
||||
| Name | Type | Required | Restrictions | Description |
|
||||
| ------- | ---------------------------------------------------- | -------- | ------------ | ----------- |
|
||||
| `name` | string | true | | |
|
||||
| `type` | [agentsdk.AgentMetricType](#agentsdkagentmetrictype) | true | | |
|
||||
| `value` | number | true | | |
|
||||
|
||||
#### Enumerated Values
|
||||
|
||||
| Property | Value |
|
||||
| -------- | --------- |
|
||||
| `type` | `counter` |
|
||||
| `type` | `gauge` |
|
||||
|
||||
## agentsdk.AgentMetricType
|
||||
|
||||
```json
|
||||
"counter"
|
||||
```
|
||||
|
||||
### Properties
|
||||
|
||||
#### Enumerated Values
|
||||
|
||||
| Value |
|
||||
| --------- |
|
||||
| `counter` |
|
||||
| `gauge` |
|
||||
|
||||
## agentsdk.AuthenticateResponse
|
||||
|
||||
```json
|
||||
|
@ -326,6 +366,13 @@
|
|||
"property1": 0,
|
||||
"property2": 0
|
||||
},
|
||||
"metrics": [
|
||||
{
|
||||
"name": "string",
|
||||
"type": "counter",
|
||||
"value": 0
|
||||
}
|
||||
],
|
||||
"rx_bytes": 0,
|
||||
"rx_packets": 0,
|
||||
"session_count_jetbrains": 0,
|
||||
|
@ -339,20 +386,21 @@
|
|||
|
||||
### Properties
|
||||
|
||||
| Name | Type | Required | Restrictions | Description |
|
||||
| -------------------------------- | ------- | -------- | ------------ | ----------------------------------------------------------------------------------------------------------------------------- |
|
||||
| `connection_count` | integer | false | | Connection count is the number of connections received by an agent. |
|
||||
| `connection_median_latency_ms` | number | false | | Connection median latency ms is the median latency of all connections in milliseconds. |
|
||||
| `connections_by_proto` | object | false | | Connections by proto is a count of connections by protocol. |
|
||||
| » `[any property]` | integer | false | | |
|
||||
| `rx_bytes` | integer | false | | Rx bytes is the number of received bytes. |
|
||||
| `rx_packets` | integer | false | | Rx packets is the number of received packets. |
|
||||
| `session_count_jetbrains` | integer | false | | Session count jetbrains is the number of connections received by an agent that are from our JetBrains extension. |
|
||||
| `session_count_reconnecting_pty` | integer | false | | Session count reconnecting pty is the number of connections received by an agent that are from the reconnecting web terminal. |
|
||||
| `session_count_ssh` | integer | false | | Session count ssh is the number of connections received by an agent that are normal, non-tagged SSH sessions. |
|
||||
| `session_count_vscode` | integer | false | | Session count vscode is the number of connections received by an agent that are from our VS Code extension. |
|
||||
| `tx_bytes` | integer | false | | Tx bytes is the number of transmitted bytes. |
|
||||
| `tx_packets` | integer | false | | Tx packets is the number of transmitted bytes. |
|
||||
| Name | Type | Required | Restrictions | Description |
|
||||
| -------------------------------- | ----------------------------------------------------- | -------- | ------------ | ----------------------------------------------------------------------------------------------------------------------------- |
|
||||
| `connection_count` | integer | false | | Connection count is the number of connections received by an agent. |
|
||||
| `connection_median_latency_ms` | number | false | | Connection median latency ms is the median latency of all connections in milliseconds. |
|
||||
| `connections_by_proto` | object | false | | Connections by proto is a count of connections by protocol. |
|
||||
| » `[any property]` | integer | false | | |
|
||||
| `metrics` | array of [agentsdk.AgentMetric](#agentsdkagentmetric) | false | | Metrics collected by the agent |
|
||||
| `rx_bytes` | integer | false | | Rx bytes is the number of received bytes. |
|
||||
| `rx_packets` | integer | false | | Rx packets is the number of received packets. |
|
||||
| `session_count_jetbrains` | integer | false | | Session count jetbrains is the number of connections received by an agent that are from our JetBrains extension. |
|
||||
| `session_count_reconnecting_pty` | integer | false | | Session count reconnecting pty is the number of connections received by an agent that are from the reconnecting web terminal. |
|
||||
| `session_count_ssh` | integer | false | | Session count ssh is the number of connections received by an agent that are normal, non-tagged SSH sessions. |
|
||||
| `session_count_vscode` | integer | false | | Session count vscode is the number of connections received by an agent that are from our VS Code extension. |
|
||||
| `tx_bytes` | integer | false | | Tx bytes is the number of transmitted bytes. |
|
||||
| `tx_packets` | integer | false | | Tx packets is the number of transmitted bytes. |
|
||||
|
||||
## agentsdk.StatsResponse
|
||||
|
||||
|
|
Loading…
Reference in New Issue