feat: Add workspace metrics export to Prometheus (#3421)

This adds workspace totals indexed by status. It could be any
codersdk.ProvisionerJobStatus.
This commit is contained in:
Kyle Carberry 2022-08-08 20:08:42 -05:00 committed by GitHub
parent e62677efab
commit 7bdb8ff9cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 329 additions and 31 deletions

View File

@ -402,11 +402,17 @@ func server() *cobra.Command {
}
if promEnabled {
options.PrometheusRegistry = prometheus.NewRegistry()
closeFunc, err := prometheusmetrics.ActiveUsers(ctx, options.PrometheusRegistry, options.Database, 0)
closeUsersFunc, err := prometheusmetrics.ActiveUsers(ctx, options.PrometheusRegistry, options.Database, 0)
if err != nil {
return xerrors.Errorf("register active users prometheus metric: %w", err)
}
defer closeFunc()
defer closeUsersFunc()
closeWorkspacesFunc, err := prometheusmetrics.Workspaces(ctx, options.PrometheusRegistry, options.Database, 0)
if err != nil {
return xerrors.Errorf("register workspaces prometheus metric: %w", err)
}
defer closeWorkspacesFunc()
//nolint:revive
defer serveHandler(ctx, logger, promhttp.InstrumentMetricHandler(

View File

@ -416,6 +416,7 @@ func TestServer(t *testing.T) {
scanner := bufio.NewScanner(res.Body)
hasActiveUsers := false
hasWorkspaces := false
for scanner.Scan() {
// This metric is manually registered to be tracked in the server. That's
// why we test it's tracked here.
@ -423,9 +424,15 @@ func TestServer(t *testing.T) {
hasActiveUsers = true
continue
}
if strings.HasPrefix(scanner.Text(), "coderd_api_workspace_latest_build_total") {
hasWorkspaces = true
continue
}
t.Logf("scanned %s", scanner.Text())
}
require.NoError(t, scanner.Err())
require.True(t, hasActiveUsers)
require.True(t, hasWorkspaces)
cancelFunc()
<-serverErr
})

View File

@ -600,6 +600,32 @@ func (q *fakeQuerier) GetLatestWorkspaceBuildByWorkspaceID(_ context.Context, wo
return row, nil
}
func (q *fakeQuerier) GetLatestWorkspaceBuilds(_ context.Context) ([]database.WorkspaceBuild, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()
builds := make(map[uuid.UUID]database.WorkspaceBuild)
buildNumbers := make(map[uuid.UUID]int32)
for _, workspaceBuild := range q.workspaceBuilds {
id := workspaceBuild.WorkspaceID
if workspaceBuild.BuildNumber > buildNumbers[id] {
builds[id] = workspaceBuild
buildNumbers[id] = workspaceBuild.BuildNumber
}
}
var returnBuilds []database.WorkspaceBuild
for i, n := range buildNumbers {
if n > 0 {
b := builds[i]
returnBuilds = append(returnBuilds, b)
}
}
if len(returnBuilds) == 0 {
return nil, sql.ErrNoRows
}
return returnBuilds, nil
}
func (q *fakeQuerier) GetLatestWorkspaceBuildsByWorkspaceIDs(_ context.Context, ids []uuid.UUID) ([]database.WorkspaceBuild, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()

View File

@ -1,6 +1,8 @@
package database
import "github.com/coder/coder/coderd/rbac"
import (
"github.com/coder/coder/coderd/rbac"
)
func (t Template) RBACObject() rbac.Object {
return rbac.ResourceTemplate.InOrg(t.OrganizationID).WithID(t.ID.String())

View File

@ -34,6 +34,7 @@ type querier interface {
GetFileByHash(ctx context.Context, hash string) (File, error)
GetGitSSHKey(ctx context.Context, userID uuid.UUID) (GitSSHKey, error)
GetLatestWorkspaceBuildByWorkspaceID(ctx context.Context, workspaceID uuid.UUID) (WorkspaceBuild, error)
GetLatestWorkspaceBuilds(ctx context.Context) ([]WorkspaceBuild, error)
GetLatestWorkspaceBuildsByWorkspaceIDs(ctx context.Context, ids []uuid.UUID) ([]WorkspaceBuild, error)
GetOrganizationByID(ctx context.Context, id uuid.UUID) (Organization, error)
GetOrganizationByName(ctx context.Context, name string) (Organization, error)

View File

@ -3459,6 +3459,58 @@ func (q *sqlQuerier) GetLatestWorkspaceBuildByWorkspaceID(ctx context.Context, w
return i, err
}
const getLatestWorkspaceBuilds = `-- name: GetLatestWorkspaceBuilds :many
SELECT wb.id, wb.created_at, wb.updated_at, wb.workspace_id, wb.template_version_id, wb.name, wb.build_number, wb.transition, wb.initiator_id, wb.provisioner_state, wb.job_id, wb.deadline, wb.reason
FROM (
SELECT
workspace_id, MAX(build_number) as max_build_number
FROM
workspace_builds
GROUP BY
workspace_id
) m
JOIN
workspace_builds wb
ON m.workspace_id = wb.workspace_id AND m.max_build_number = wb.build_number
`
func (q *sqlQuerier) GetLatestWorkspaceBuilds(ctx context.Context) ([]WorkspaceBuild, error) {
rows, err := q.db.QueryContext(ctx, getLatestWorkspaceBuilds)
if err != nil {
return nil, err
}
defer rows.Close()
var items []WorkspaceBuild
for rows.Next() {
var i WorkspaceBuild
if err := rows.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.WorkspaceID,
&i.TemplateVersionID,
&i.Name,
&i.BuildNumber,
&i.Transition,
&i.InitiatorID,
&i.ProvisionerState,
&i.JobID,
&i.Deadline,
&i.Reason,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getLatestWorkspaceBuildsByWorkspaceIDs = `-- name: GetLatestWorkspaceBuildsByWorkspaceIDs :many
SELECT wb.id, wb.created_at, wb.updated_at, wb.workspace_id, wb.template_version_id, wb.name, wb.build_number, wb.transition, wb.initiator_id, wb.provisioner_state, wb.job_id, wb.deadline, wb.reason
FROM (

View File

@ -99,6 +99,19 @@ JOIN
workspace_builds wb
ON m.workspace_id = wb.workspace_id AND m.max_build_number = wb.build_number;
-- name: GetLatestWorkspaceBuilds :many
SELECT wb.*
FROM (
SELECT
workspace_id, MAX(build_number) as max_build_number
FROM
workspace_builds
GROUP BY
workspace_id
) m
JOIN
workspace_builds wb
ON m.workspace_id = wb.workspace_id AND m.max_build_number = wb.build_number;
-- name: InsertWorkspaceBuild :one
INSERT INTO

View File

@ -7,6 +7,7 @@ import (
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/coder/coder/coderd"
"github.com/coder/coder/coderd/database"
)
@ -50,3 +51,56 @@ func ActiveUsers(ctx context.Context, registerer prometheus.Registerer, db datab
}()
return cancelFunc, nil
}
// Workspaces tracks the total number of workspaces with labels on status.
func Workspaces(ctx context.Context, registerer prometheus.Registerer, db database.Store, duration time.Duration) (context.CancelFunc, error) {
if duration == 0 {
duration = 5 * time.Minute
}
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: "api",
Name: "workspace_latest_build_total",
Help: "The latest workspace builds with a status.",
}, []string{"status"})
err := registerer.Register(gauge)
if err != nil {
return nil, err
}
// This exists so the prometheus metric exports immediately when set.
// It helps with tests so they don't have to wait for a tick.
gauge.WithLabelValues("pending").Set(0)
ctx, cancelFunc := context.WithCancel(ctx)
ticker := time.NewTicker(duration)
go func() {
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
builds, err := db.GetLatestWorkspaceBuilds(ctx)
if err != nil {
continue
}
jobIDs := make([]uuid.UUID, 0, len(builds))
for _, build := range builds {
jobIDs = append(jobIDs, build.JobID)
}
jobs, err := db.GetProvisionerJobsByIDs(ctx, jobIDs)
if err != nil {
continue
}
gauge.Reset()
for _, job := range jobs {
status := coderd.ConvertProvisionerJobStatus(job)
gauge.WithLabelValues(string(status)).Add(1)
}
}
}()
return cancelFunc, nil
}

View File

@ -2,6 +2,7 @@ package prometheusmetrics_test
import (
"context"
"database/sql"
"testing"
"time"
@ -13,6 +14,7 @@ import (
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/database/databasefake"
"github.com/coder/coder/coderd/prometheusmetrics"
"github.com/coder/coder/codersdk"
"github.com/coder/coder/testutil"
)
@ -81,13 +83,148 @@ func TestActiveUsers(t *testing.T) {
require.NoError(t, err)
t.Cleanup(cancel)
var result int
require.Eventually(t, func() bool {
metrics, err := registry.Gather()
assert.NoError(t, err)
result = int(*metrics[0].Metric[0].Gauge.Value)
result := int(*metrics[0].Metric[0].Gauge.Value)
return result == tc.Count
}, testutil.WaitShort, testutil.IntervalFast)
})
}
}
func TestWorkspaces(t *testing.T) {
t.Parallel()
insertRunning := func(db database.Store) database.ProvisionerJob {
job, _ := db.InsertProvisionerJob(context.Background(), database.InsertProvisionerJobParams{
ID: uuid.New(),
CreatedAt: database.Now(),
UpdatedAt: database.Now(),
Provisioner: database.ProvisionerTypeEcho,
})
_, _ = db.InsertWorkspaceBuild(context.Background(), database.InsertWorkspaceBuildParams{
ID: uuid.New(),
WorkspaceID: uuid.New(),
JobID: job.ID,
BuildNumber: 1,
})
// This marks the job as started.
_, _ = db.AcquireProvisionerJob(context.Background(), database.AcquireProvisionerJobParams{
StartedAt: sql.NullTime{
Time: database.Now(),
Valid: true,
},
Types: []database.ProvisionerType{database.ProvisionerTypeEcho},
})
return job
}
insertCanceled := func(db database.Store) {
job := insertRunning(db)
_ = db.UpdateProvisionerJobWithCancelByID(context.Background(), database.UpdateProvisionerJobWithCancelByIDParams{
ID: job.ID,
CanceledAt: sql.NullTime{
Time: database.Now(),
Valid: true,
},
})
_ = db.UpdateProvisionerJobWithCompleteByID(context.Background(), database.UpdateProvisionerJobWithCompleteByIDParams{
ID: job.ID,
CompletedAt: sql.NullTime{
Time: database.Now(),
Valid: true,
},
})
}
insertFailed := func(db database.Store) {
job := insertRunning(db)
_ = db.UpdateProvisionerJobWithCompleteByID(context.Background(), database.UpdateProvisionerJobWithCompleteByIDParams{
ID: job.ID,
CompletedAt: sql.NullTime{
Time: database.Now(),
Valid: true,
},
Error: sql.NullString{
String: "failed",
Valid: true,
},
})
}
insertSuccess := func(db database.Store) {
job := insertRunning(db)
_ = db.UpdateProvisionerJobWithCompleteByID(context.Background(), database.UpdateProvisionerJobWithCompleteByIDParams{
ID: job.ID,
CompletedAt: sql.NullTime{
Time: database.Now(),
Valid: true,
},
})
}
for _, tc := range []struct {
Name string
Database func() database.Store
Total int
Status map[codersdk.ProvisionerJobStatus]int
}{{
Name: "None",
Database: func() database.Store {
return databasefake.New()
},
Total: 0,
}, {
Name: "Multiple",
Database: func() database.Store {
db := databasefake.New()
insertCanceled(db)
insertFailed(db)
insertFailed(db)
insertSuccess(db)
insertSuccess(db)
insertSuccess(db)
insertRunning(db)
return db
},
Total: 7,
Status: map[codersdk.ProvisionerJobStatus]int{
codersdk.ProvisionerJobCanceled: 1,
codersdk.ProvisionerJobFailed: 2,
codersdk.ProvisionerJobSucceeded: 3,
codersdk.ProvisionerJobRunning: 1,
},
}} {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()
registry := prometheus.NewRegistry()
cancel, err := prometheusmetrics.Workspaces(context.Background(), registry, tc.Database(), time.Millisecond)
require.NoError(t, err)
t.Cleanup(cancel)
require.Eventually(t, func() bool {
metrics, err := registry.Gather()
assert.NoError(t, err)
if len(metrics) < 1 {
return false
}
sum := 0
for _, metric := range metrics[0].Metric {
count, ok := tc.Status[codersdk.ProvisionerJobStatus(metric.Label[0].GetValue())]
if metric.Gauge.GetValue() == 0 {
continue
}
if !ok {
t.Fail()
}
require.Equal(t, count, int(metric.Gauge.GetValue()), "invalid count for %s", metric.Label[0].GetValue())
sum += int(metric.Gauge.GetValue())
}
t.Logf("sum %d == total %d", sum, tc.Total)
return sum == tc.Total
}, testutil.WaitShort, testutil.IntervalFast)
})
}
}

View File

@ -322,36 +322,36 @@ func convertProvisionerJob(provisionerJob database.ProvisionerJob) codersdk.Prov
if provisionerJob.WorkerID.Valid {
job.WorkerID = &provisionerJob.WorkerID.UUID
}
switch {
case provisionerJob.CanceledAt.Valid:
if provisionerJob.CompletedAt.Valid {
if job.Error == "" {
job.Status = codersdk.ProvisionerJobCanceled
} else {
job.Status = codersdk.ProvisionerJobFailed
}
} else {
job.Status = codersdk.ProvisionerJobCanceling
}
case !provisionerJob.StartedAt.Valid:
job.Status = codersdk.ProvisionerJobPending
case provisionerJob.CompletedAt.Valid:
if job.Error == "" {
job.Status = codersdk.ProvisionerJobSucceeded
} else {
job.Status = codersdk.ProvisionerJobFailed
}
case database.Now().Sub(provisionerJob.UpdatedAt) > 30*time.Second:
job.Status = codersdk.ProvisionerJobFailed
job.Error = "Worker failed to update job in time."
default:
job.Status = codersdk.ProvisionerJobRunning
}
job.Status = ConvertProvisionerJobStatus(provisionerJob)
return job
}
func ConvertProvisionerJobStatus(provisionerJob database.ProvisionerJob) codersdk.ProvisionerJobStatus {
switch {
case provisionerJob.CanceledAt.Valid:
if !provisionerJob.CompletedAt.Valid {
return codersdk.ProvisionerJobCanceling
}
if provisionerJob.Error.String == "" {
return codersdk.ProvisionerJobCanceled
}
return codersdk.ProvisionerJobFailed
case !provisionerJob.StartedAt.Valid:
return codersdk.ProvisionerJobPending
case provisionerJob.CompletedAt.Valid:
if provisionerJob.Error.String == "" {
return codersdk.ProvisionerJobSucceeded
}
return codersdk.ProvisionerJobFailed
case database.Now().Sub(provisionerJob.UpdatedAt) > 30*time.Second:
provisionerJob.Error.String = "Worker failed to update job in time."
return codersdk.ProvisionerJobFailed
default:
return codersdk.ProvisionerJobRunning
}
}
func provisionerJobLogsChannel(jobID uuid.UUID) string {
return fmt.Sprintf("provisioner-log-logs:%s", jobID)
}