feat: Add active users prometheus metric (#3406)

This  allows deployments using our Prometheus export t determine
the number of active users in the past hour.

The interval is an hour to align with API key last used refresh times.

SSH connections poll to check shutdown time, so this will be accurate
even on long-running connections without dashboard requests.
This commit is contained in:
Kyle Carberry 2022-08-08 10:09:46 -05:00 committed by GitHub
parent 13a2014d7f
commit 3279504cbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 307 additions and 55 deletions

View File

@ -11,6 +11,7 @@
"coderdtest",
"codersdk",
"cronstrue",
"databasefake",
"devel",
"drpc",
"drpcconn",
@ -52,6 +53,7 @@
"oneof",
"parameterscopeid",
"pqtype",
"prometheusmetrics",
"promptui",
"protobuf",
"provisionerd",
@ -72,6 +74,7 @@
"templateversions",
"testdata",
"testid",
"testutil",
"tfexec",
"tfjson",
"tfplan",

View File

@ -30,6 +30,7 @@ import (
"github.com/google/uuid"
"github.com/pion/turn/v2"
"github.com/pion/webrtc/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/afero"
"github.com/spf13/cobra"
@ -53,6 +54,7 @@ import (
"github.com/coder/coder/coderd/database/databasefake"
"github.com/coder/coder/coderd/devtunnel"
"github.com/coder/coder/coderd/gitsshkey"
"github.com/coder/coder/coderd/prometheusmetrics"
"github.com/coder/coder/coderd/telemetry"
"github.com/coder/coder/coderd/tracing"
"github.com/coder/coder/coderd/turnconn"
@ -392,6 +394,26 @@ func server() *cobra.Command {
defer options.Telemetry.Close()
}
// This prevents the pprof import from being accidentally deleted.
_ = pprof.Handler
if pprofEnabled {
//nolint:revive
defer serveHandler(ctx, logger, nil, pprofAddress, "pprof")()
}
if promEnabled {
options.PrometheusRegistry = prometheus.NewRegistry()
closeFunc, err := prometheusmetrics.ActiveUsers(ctx, options.PrometheusRegistry, options.Database, 0)
if err != nil {
return xerrors.Errorf("register active users prometheus metric: %w", err)
}
defer closeFunc()
//nolint:revive
defer serveHandler(ctx, logger, promhttp.InstrumentMetricHandler(
options.PrometheusRegistry, promhttp.HandlerFor(options.PrometheusRegistry, promhttp.HandlerOpts{}),
), promAddress, "prometheus")()
}
coderAPI := coderd.New(options)
defer coderAPI.Close()
@ -406,17 +428,6 @@ func server() *cobra.Command {
}
}
// This prevents the pprof import from being accidentally deleted.
_ = pprof.Handler
if pprofEnabled {
//nolint:revive
defer serveHandler(ctx, logger, nil, pprofAddress, "pprof")()
}
if promEnabled {
//nolint:revive
defer serveHandler(ctx, logger, promhttp.Handler(), promAddress, "prometheus")()
}
// Since errCh only has one buffered slot, all routines
// sending on it must be wrapped in a select/default to
// avoid leaving dangling goroutines waiting for the

View File

@ -1,6 +1,7 @@
package cli_test
import (
"bufio"
"context"
"crypto/ecdsa"
"crypto/elliptic"
@ -10,6 +11,7 @@ import (
"crypto/x509/pkix"
"encoding/json"
"encoding/pem"
"fmt"
"math/big"
"net"
"net/http"
@ -17,10 +19,13 @@ import (
"net/url"
"os"
"runtime"
"strconv"
"strings"
"testing"
"time"
"github.com/go-chi/chi"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
@ -374,6 +379,56 @@ func TestServer(t *testing.T) {
cancelFunc()
<-errC
})
t.Run("Prometheus", func(t *testing.T) {
t.Parallel()
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
random, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
_ = random.Close()
tcpAddr, valid := random.Addr().(*net.TCPAddr)
require.True(t, valid)
randomPort := tcpAddr.Port
root, cfg := clitest.New(t,
"server",
"--in-memory",
"--address", ":0",
"--provisioner-daemons", "1",
"--prometheus-enable",
"--prometheus-address", ":"+strconv.Itoa(randomPort),
"--cache-dir", t.TempDir(),
)
serverErr := make(chan error, 1)
go func() {
serverErr <- root.ExecuteContext(ctx)
}()
_ = waitAccessURL(t, cfg)
var res *http.Response
require.Eventually(t, func() bool {
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://127.0.0.1:%d", randomPort), nil)
assert.NoError(t, err)
res, err = http.DefaultClient.Do(req)
return err == nil
}, testutil.WaitShort, testutil.IntervalFast)
scanner := bufio.NewScanner(res.Body)
hasActiveUsers := false
for scanner.Scan() {
// This metric is manually registered to be tracked in the server. That's
// why we test it's tracked here.
if strings.HasPrefix(scanner.Text(), "coderd_api_active_users_duration_hour") {
hasActiveUsers = true
continue
}
}
require.NoError(t, scanner.Err())
require.True(t, hasActiveUsers)
cancelFunc()
<-serverErr
})
}
func generateTLSCertificate(t testing.TB) (certPath, keyPath string) {

View File

@ -16,6 +16,7 @@ import (
"github.com/go-chi/chi/v5/middleware"
"github.com/klauspost/compress/zstd"
"github.com/pion/webrtc/v3"
"github.com/prometheus/client_golang/prometheus"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"golang.org/x/xerrors"
"google.golang.org/api/idtoken"
@ -58,6 +59,7 @@ type Options struct {
GoogleTokenValidator *idtoken.Validator
GithubOAuth2Config *GithubOAuth2Config
OIDCConfig *OIDCConfig
PrometheusRegistry *prometheus.Registry
ICEServers []webrtc.ICEServer
SecureAuthCookie bool
SSHKeygenAlgorithm gitsshkey.Algorithm
@ -87,6 +89,9 @@ func New(options *Options) *API {
panic(xerrors.Errorf("rego authorize panic: %w", err))
}
}
if options.PrometheusRegistry == nil {
options.PrometheusRegistry = prometheus.NewRegistry()
}
siteCacheDir := options.CacheDir
if siteCacheDir != "" {
@ -116,7 +121,7 @@ func New(options *Options) *API {
next.ServeHTTP(middleware.NewWrapResponseWriter(w, r.ProtoMajor), r)
})
},
httpmw.Prometheus,
httpmw.Prometheus(options.PrometheusRegistry),
tracing.HTTPMW(api.TracerProvider, "coderd.http"),
)

View File

@ -12,26 +12,31 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
requestsProcessed = promauto.NewCounterVec(prometheus.CounterOpts{
func durationToFloatMs(d time.Duration) float64 {
return float64(d.Milliseconds())
}
func Prometheus(register prometheus.Registerer) func(http.Handler) http.Handler {
factory := promauto.With(register)
requestsProcessed := factory.NewCounterVec(prometheus.CounterOpts{
Namespace: "coderd",
Subsystem: "api",
Name: "requests_processed_total",
Help: "The total number of processed API requests",
}, []string{"code", "method", "path"})
requestsConcurrent = promauto.NewGauge(prometheus.GaugeOpts{
requestsConcurrent := factory.NewGauge(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: "api",
Name: "concurrent_requests",
Help: "The number of concurrent API requests",
})
websocketsConcurrent = promauto.NewGauge(prometheus.GaugeOpts{
websocketsConcurrent := factory.NewGauge(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: "api",
Name: "concurrent_websockets",
Help: "The total number of concurrent API websockets",
})
websocketsDist = promauto.NewHistogramVec(prometheus.HistogramOpts{
websocketsDist := factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "coderd",
Subsystem: "api",
Name: "websocket_durations_ms",
@ -45,58 +50,55 @@ var (
durationToFloatMs(30 * time.Hour),
},
}, []string{"path"})
requestsDist = promauto.NewHistogramVec(prometheus.HistogramOpts{
requestsDist := factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "coderd",
Subsystem: "api",
Name: "request_latencies_ms",
Help: "Latency distribution of requests in milliseconds",
Buckets: []float64{1, 5, 10, 25, 50, 100, 500, 1000, 5000, 10000, 30000},
}, []string{"method", "path"})
)
func durationToFloatMs(d time.Duration) float64 {
return float64(d.Milliseconds())
}
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var (
start = time.Now()
method = r.Method
rctx = chi.RouteContext(r.Context())
)
func Prometheus(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var (
start = time.Now()
method = r.Method
rctx = chi.RouteContext(r.Context())
)
sw, ok := w.(chimw.WrapResponseWriter)
if !ok {
panic("dev error: http.ResponseWriter is not chimw.WrapResponseWriter")
}
sw, ok := w.(chimw.WrapResponseWriter)
if !ok {
panic("dev error: http.ResponseWriter is not chimw.WrapResponseWriter")
}
var (
dist *prometheus.HistogramVec
distOpts []string
)
// We want to count websockets separately.
if isWebsocketUpgrade(r) {
websocketsConcurrent.Inc()
defer websocketsConcurrent.Dec()
var (
dist *prometheus.HistogramVec
distOpts []string
)
// We want to count WebSockets separately.
if isWebsocketUpgrade(r) {
websocketsConcurrent.Inc()
defer websocketsConcurrent.Dec()
dist = websocketsDist
} else {
requestsConcurrent.Inc()
defer requestsConcurrent.Dec()
dist = websocketsDist
} else {
requestsConcurrent.Inc()
defer requestsConcurrent.Dec()
dist = requestsDist
distOpts = []string{method}
}
dist = requestsDist
distOpts = []string{method}
}
next.ServeHTTP(w, r)
next.ServeHTTP(w, r)
path := rctx.RoutePattern()
distOpts = append(distOpts, path)
statusStr := strconv.Itoa(sw.Status())
path := rctx.RoutePattern()
distOpts = append(distOpts, path)
statusStr := strconv.Itoa(sw.Status())
requestsProcessed.WithLabelValues(statusStr, method, path).Inc()
dist.WithLabelValues(distOpts...).Observe(float64(time.Since(start)) / 1e6)
})
requestsProcessed.WithLabelValues(statusStr, method, path).Inc()
dist.WithLabelValues(distOpts...).Observe(float64(time.Since(start)) / 1e6)
})
}
}
func isWebsocketUpgrade(r *http.Request) bool {

View File

@ -0,0 +1,31 @@
package httpmw_test
import (
"context"
"net/http"
"net/http/httptest"
"testing"
"github.com/go-chi/chi/v5"
chimw "github.com/go-chi/chi/v5/middleware"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/coder/coder/coderd/httpmw"
)
func TestPrometheus(t *testing.T) {
t.Parallel()
t.Run("All", func(t *testing.T) {
req := httptest.NewRequest("GET", "/", nil)
req = req.WithContext(context.WithValue(req.Context(), chi.RouteCtxKey, chi.NewRouteContext()))
res := chimw.NewWrapResponseWriter(httptest.NewRecorder(), 0)
reg := prometheus.NewRegistry()
httpmw.Prometheus(reg)(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})).ServeHTTP(res, req)
metrics, err := reg.Gather()
require.NoError(t, err)
require.Greater(t, len(metrics), 0)
})
}

View File

@ -0,0 +1,52 @@
package prometheusmetrics
import (
"context"
"time"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/coder/coder/coderd/database"
)
// ActiveUsers tracks the number of users that have authenticated within the past hour.
func ActiveUsers(ctx context.Context, registerer prometheus.Registerer, db database.Store, duration time.Duration) (context.CancelFunc, error) {
if duration == 0 {
duration = 5 * time.Minute
}
gauge := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: "api",
Name: "active_users_duration_hour",
Help: "The number of users that have been active within the last hour.",
})
err := registerer.Register(gauge)
if err != nil {
return nil, err
}
ctx, cancelFunc := context.WithCancel(ctx)
ticker := time.NewTicker(duration)
go func() {
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
apiKeys, err := db.GetAPIKeysLastUsedAfter(ctx, database.Now().Add(-1*time.Hour))
if err != nil {
continue
}
distinctUsers := map[uuid.UUID]struct{}{}
for _, apiKey := range apiKeys {
distinctUsers[apiKey.UserID] = struct{}{}
}
gauge.Set(float64(len(distinctUsers)))
}
}()
return cancelFunc, nil
}

View File

@ -0,0 +1,93 @@
package prometheusmetrics_test
import (
"context"
"testing"
"time"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/database/databasefake"
"github.com/coder/coder/coderd/prometheusmetrics"
"github.com/coder/coder/testutil"
)
func TestActiveUsers(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
Name string
Database func() database.Store
Count int
}{{
Name: "None",
Database: func() database.Store {
return databasefake.New()
},
Count: 0,
}, {
Name: "One",
Database: func() database.Store {
db := databasefake.New()
_, _ = db.InsertAPIKey(context.Background(), database.InsertAPIKeyParams{
UserID: uuid.New(),
LastUsed: database.Now(),
})
return db
},
Count: 1,
}, {
Name: "OneWithExpired",
Database: func() database.Store {
db := databasefake.New()
_, _ = db.InsertAPIKey(context.Background(), database.InsertAPIKeyParams{
UserID: uuid.New(),
LastUsed: database.Now(),
})
// Because this API key hasn't been used in the past hour, this shouldn't
// add to the user count.
_, _ = db.InsertAPIKey(context.Background(), database.InsertAPIKeyParams{
UserID: uuid.New(),
LastUsed: database.Now().Add(-2 * time.Hour),
})
return db
},
Count: 1,
}, {
Name: "Multiple",
Database: func() database.Store {
db := databasefake.New()
_, _ = db.InsertAPIKey(context.Background(), database.InsertAPIKeyParams{
UserID: uuid.New(),
LastUsed: database.Now(),
})
_, _ = db.InsertAPIKey(context.Background(), database.InsertAPIKeyParams{
UserID: uuid.New(),
LastUsed: database.Now(),
})
return db
},
Count: 2,
}} {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()
registry := prometheus.NewRegistry()
cancel, err := prometheusmetrics.ActiveUsers(context.Background(), registry, tc.Database(), time.Millisecond)
require.NoError(t, err)
t.Cleanup(cancel)
var result int
require.Eventually(t, func() bool {
metrics, err := registry.Gather()
assert.NoError(t, err)
result = int(*metrics[0].Metric[0].Gauge.Value)
return result == tc.Count
}, testutil.WaitShort, testutil.IntervalFast)
})
}
}