feat(coderd/database): add `dbrollup` service to rollup insights (#12665)

Add `dbrollup` service that runs the `UpsertTemplateUsageStats` query
every 5 minutes, on the minute. This allows us to have fairly real-time
insights data when viewing "today".
This commit is contained in:
Mathias Fredriksson 2024-03-22 18:42:43 +02:00 committed by GitHub
parent 04f0510b09
commit 12e6fbf11e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 479 additions and 5 deletions

View File

@ -47,6 +47,7 @@ import (
"github.com/coder/coder/v2/coderd/batchstats"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbrollup"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/externalauth"
@ -192,6 +193,9 @@ type Options struct {
// NewTicker is used for unit tests to replace "time.NewTicker".
NewTicker func(duration time.Duration) (tick <-chan time.Time, done func())
// DatabaseRolluper rolls up template usage stats from raw agent and app
// stats. This is used to provide insights in the WebUI.
DatabaseRolluper *dbrollup.Rolluper
// WorkspaceUsageTracker tracks workspace usage by the CLI.
WorkspaceUsageTracker *workspaceusage.Tracker
}
@ -366,6 +370,10 @@ func New(options *Options) *API {
OIDC: options.OIDCConfig,
}
if options.DatabaseRolluper == nil {
options.DatabaseRolluper = dbrollup.New(options.Logger.Named("dbrollup"), options.Database)
}
if options.WorkspaceUsageTracker == nil {
options.WorkspaceUsageTracker = workspaceusage.New(options.Database,
workspaceusage.WithLogger(options.Logger.Named("workspace_usage_tracker")),
@ -414,7 +422,9 @@ func New(options *Options) *API {
ctx,
options.Logger.Named("acquirer"),
options.Database,
options.Pubsub),
options.Pubsub,
),
dbRolluper: options.DatabaseRolluper,
workspaceUsageTracker: options.WorkspaceUsageTracker,
}
@ -1197,7 +1207,9 @@ type API struct {
statsBatcher *batchstats.Batcher
Acquirer *provisionerdserver.Acquirer
// dbRolluper rolls up template usage stats from raw agent and app
// stats. This is used to provide insights in the WebUI.
dbRolluper *dbrollup.Rolluper
workspaceUsageTracker *workspaceusage.Tracker
}
@ -1212,6 +1224,7 @@ func (api *API) Close() error {
api.WebsocketWaitGroup.Wait()
api.WebsocketWaitMutex.Unlock()
api.dbRolluper.Close()
api.metricsCache.Close()
if api.updateChecker != nil {
api.updateChecker.Close()

View File

@ -57,6 +57,7 @@ import (
"github.com/coder/coder/v2/coderd/batchstats"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbrollup"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/externalauth"
@ -147,6 +148,7 @@ type Options struct {
WorkspaceAppsStatsCollectorOptions workspaceapps.StatsCollectorOptions
AllowWorkspaceRenames bool
NewTicker func(duration time.Duration) (<-chan time.Time, func())
DatabaseRolluper *dbrollup.Rolluper
WorkspaceUsageTrackerFlush chan int
WorkspaceUsageTrackerTick chan time.Time
}
@ -491,6 +493,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
WorkspaceAppsStatsCollectorOptions: options.WorkspaceAppsStatsCollectorOptions,
AllowWorkspaceRenames: options.AllowWorkspaceRenames,
NewTicker: options.NewTicker,
DatabaseRolluper: options.DatabaseRolluper,
WorkspaceUsageTracker: wuTracker,
}
}

View File

@ -489,6 +489,38 @@ func WorkspaceApp(t testing.TB, db database.Store, orig database.WorkspaceApp) d
return resource
}
func WorkspaceAppStat(t testing.TB, db database.Store, orig database.WorkspaceAppStat) database.WorkspaceAppStat {
// This is not going to be correct, but our query doesn't return the ID.
id, err := cryptorand.Int63()
require.NoError(t, err, "generate id")
scheme := database.WorkspaceAppStat{
ID: takeFirst(orig.ID, id),
UserID: takeFirst(orig.UserID, uuid.New()),
WorkspaceID: takeFirst(orig.WorkspaceID, uuid.New()),
AgentID: takeFirst(orig.AgentID, uuid.New()),
AccessMethod: takeFirst(orig.AccessMethod, ""),
SlugOrPort: takeFirst(orig.SlugOrPort, ""),
SessionID: takeFirst(orig.SessionID, uuid.New()),
SessionStartedAt: takeFirst(orig.SessionStartedAt, dbtime.Now().Add(-time.Minute)),
SessionEndedAt: takeFirst(orig.SessionEndedAt, dbtime.Now()),
Requests: takeFirst(orig.Requests, 1),
}
err = db.InsertWorkspaceAppStats(genCtx, database.InsertWorkspaceAppStatsParams{
UserID: []uuid.UUID{scheme.UserID},
WorkspaceID: []uuid.UUID{scheme.WorkspaceID},
AgentID: []uuid.UUID{scheme.AgentID},
AccessMethod: []string{scheme.AccessMethod},
SlugOrPort: []string{scheme.SlugOrPort},
SessionID: []uuid.UUID{scheme.SessionID},
SessionStartedAt: []time.Time{scheme.SessionStartedAt},
SessionEndedAt: []time.Time{scheme.SessionEndedAt},
Requests: []int32{scheme.Requests},
})
require.NoError(t, err, "insert workspace agent stat")
return scheme
}
func WorkspaceResource(t testing.TB, db database.Store, orig database.WorkspaceResource) database.WorkspaceResource {
resource, err := db.InsertWorkspaceResource(genCtx, database.InsertWorkspaceResourceParams{
ID: takeFirst(orig.ID, uuid.New()),

View File

@ -24,6 +24,7 @@ const (
// This is for cleaning up old, unused resources from the database that take up space.
func New(ctx context.Context, logger slog.Logger, db database.Store) io.Closer {
closed := make(chan struct{})
logger = logger.Named("dbpurge")
ctx, cancelFunc := context.WithCancel(ctx)
//nolint:gocritic // The system purges old db records without user input.

View File

@ -0,0 +1,173 @@
package dbrollup
import (
"context"
"flag"
"time"
"golang.org/x/sync/errgroup"
"cdr.dev/slog"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
)
const (
// DefaultInterval is the default time between rollups.
// Rollups will be synchronized with the clock so that
// they happen 13:00, 13:05, 13:10, etc.
DefaultInterval = 5 * time.Minute
)
type Event struct {
TemplateUsageStats bool
}
type Rolluper struct {
cancel context.CancelFunc
closed chan struct{}
db database.Store
logger slog.Logger
interval time.Duration
event chan<- Event
}
type Option func(*Rolluper)
// WithInterval sets the interval between rollups.
func WithInterval(interval time.Duration) Option {
return func(r *Rolluper) {
r.interval = interval
}
}
// WithEventChannel sets the event channel to use for rollup events.
//
// This is only used for testing.
func WithEventChannel(ch chan<- Event) Option {
if flag.Lookup("test.v") == nil {
panic("developer error: WithEventChannel is not to be used outside of tests")
}
return func(r *Rolluper) {
r.event = ch
}
}
// New creates a new DB rollup service that periodically runs rollup queries.
// It is the caller's responsibility to call Close on the returned instance.
//
// This is for e.g. generating insights data (template_usage_stats) from
// raw data (workspace_agent_stats, workspace_app_stats).
func New(logger slog.Logger, db database.Store, opts ...Option) *Rolluper {
ctx, cancel := context.WithCancel(context.Background())
r := &Rolluper{
cancel: cancel,
closed: make(chan struct{}),
db: db,
logger: logger,
interval: DefaultInterval,
}
for _, opt := range opts {
opt(r)
}
//nolint:gocritic // The system rolls up database tables without user input.
ctx = dbauthz.AsSystemRestricted(ctx)
go r.start(ctx)
return r
}
func (r *Rolluper) start(ctx context.Context) {
defer close(r.closed)
do := func() {
var eg errgroup.Group
r.logger.Debug(ctx, "rolling up data")
now := time.Now()
// Track whether or not we performed a rollup (we got the advisory lock).
var ev Event
eg.Go(func() error {
return r.db.InTx(func(tx database.Store) error {
// Acquire a lock to ensure that only one instance of
// the rollup is running at a time.
ok, err := tx.TryAcquireLock(ctx, database.LockIDDBRollup)
if err != nil {
return err
}
if !ok {
return nil
}
ev.TemplateUsageStats = true
return tx.UpsertTemplateUsageStats(ctx)
}, nil)
})
err := eg.Wait()
if err != nil {
if database.IsQueryCanceledError(err) {
return
}
// Only log if Close hasn't been called.
if ctx.Err() == nil {
r.logger.Error(ctx, "failed to rollup data", slog.Error(err))
}
return
}
r.logger.Debug(ctx,
"rolled up data",
slog.F("took", time.Since(now)),
slog.F("event", ev),
)
// For testing.
if r.event != nil {
select {
case <-ctx.Done():
return
case r.event <- ev:
}
}
}
// Perform do immediately and on every tick of the ticker,
// disregarding the execution time of do. This ensure that
// the rollup is performed every interval assuming do does
// not take longer than the interval to execute.
t := time.NewTicker(time.Microsecond)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
// Ensure we're on the interval.
now := time.Now()
next := now.Add(r.interval).Truncate(r.interval) // Ensure we're on the interval and synced with the clock.
d := next.Sub(now)
// Safety check (shouldn't be possible).
if d <= 0 {
d = r.interval
}
t.Reset(d)
do()
r.logger.Debug(ctx, "next rollup at", slog.F("next", next))
}
}
}
func (r *Rolluper) Close() error {
r.cancel()
<-r.closed
return nil
}

View File

@ -0,0 +1,253 @@
package dbrollup_test
import (
"context"
"database/sql"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbgen"
"github.com/coder/coder/v2/coderd/database/dbmem"
"github.com/coder/coder/v2/coderd/database/dbrollup"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/testutil"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
func TestRollup_Close(t *testing.T) {
t.Parallel()
rolluper := dbrollup.New(slogtest.Make(t, nil), dbmem.New(), dbrollup.WithInterval(250*time.Millisecond))
err := rolluper.Close()
require.NoError(t, err)
}
type wrapUpsertDB struct {
database.Store
resume <-chan struct{}
}
func (w *wrapUpsertDB) InTx(fn func(database.Store) error, opts *sql.TxOptions) error {
return w.Store.InTx(func(tx database.Store) error {
return fn(&wrapUpsertDB{Store: tx, resume: w.resume})
}, opts)
}
func (w *wrapUpsertDB) UpsertTemplateUsageStats(ctx context.Context) error {
<-w.resume
return w.Store.UpsertTemplateUsageStats(ctx)
}
func TestRollup_TwoInstancesUseLocking(t *testing.T) {
t.Parallel()
if !dbtestutil.WillUsePostgres() {
t.Skip("Skipping test; only works with PostgreSQL.")
}
db, ps := dbtestutil.NewDB(t, dbtestutil.WithDumpOnFailure())
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: false}).Leveled(slog.LevelDebug)
var (
org = dbgen.Organization(t, db, database.Organization{})
user = dbgen.User(t, db, database.User{Name: "user1"})
tpl = dbgen.Template(t, db, database.Template{OrganizationID: org.ID, CreatedBy: user.ID})
ver = dbgen.TemplateVersion(t, db, database.TemplateVersion{OrganizationID: org.ID, TemplateID: uuid.NullUUID{UUID: tpl.ID, Valid: true}, CreatedBy: user.ID})
ws = dbgen.Workspace(t, db, database.Workspace{OrganizationID: org.ID, TemplateID: tpl.ID, OwnerID: user.ID})
job = dbgen.ProvisionerJob(t, db, ps, database.ProvisionerJob{OrganizationID: org.ID})
build = dbgen.WorkspaceBuild(t, db, database.WorkspaceBuild{WorkspaceID: ws.ID, JobID: job.ID, TemplateVersionID: ver.ID})
res = dbgen.WorkspaceResource(t, db, database.WorkspaceResource{JobID: build.JobID})
agent = dbgen.WorkspaceAgent(t, db, database.WorkspaceAgent{ResourceID: res.ID})
)
refTime := dbtime.Now().Truncate(time.Hour)
_ = dbgen.WorkspaceAgentStat(t, db, database.WorkspaceAgentStat{
TemplateID: tpl.ID,
WorkspaceID: ws.ID,
AgentID: agent.ID,
UserID: user.ID,
CreatedAt: refTime.Add(-time.Minute),
ConnectionMedianLatencyMS: 1,
ConnectionCount: 1,
SessionCountSSH: 1,
})
closeRolluper := func(rolluper *dbrollup.Rolluper, resume chan struct{}) {
close(resume)
err := rolluper.Close()
require.NoError(t, err)
}
interval := dbrollup.WithInterval(250 * time.Millisecond)
events1 := make(chan dbrollup.Event)
resume1 := make(chan struct{}, 1)
rolluper1 := dbrollup.New(
logger.Named("dbrollup1"),
&wrapUpsertDB{Store: db, resume: resume1},
interval,
dbrollup.WithEventChannel(events1),
)
defer closeRolluper(rolluper1, resume1)
events2 := make(chan dbrollup.Event)
resume2 := make(chan struct{}, 1)
rolluper2 := dbrollup.New(
logger.Named("dbrollup2"),
&wrapUpsertDB{Store: db, resume: resume2},
interval,
dbrollup.WithEventChannel(events2),
)
defer closeRolluper(rolluper2, resume2)
ctx := testutil.Context(t, testutil.WaitMedium)
// One of the rollup instances should roll up and the other should not.
var ev1, ev2 dbrollup.Event
select {
case <-ctx.Done():
t.Fatal("timed out waiting for rollup to occur")
case ev1 = <-events1:
resume2 <- struct{}{}
ev2 = <-events2
case ev2 = <-events2:
resume1 <- struct{}{}
ev1 = <-events1
}
require.NotEqual(t, ev1, ev2, "one of the rollup instances should have rolled up and the other not")
rows, err := db.GetTemplateUsageStats(ctx, database.GetTemplateUsageStatsParams{
StartTime: refTime.Add(-time.Hour).Truncate(time.Hour),
EndTime: refTime,
})
require.NoError(t, err)
require.Len(t, rows, 1)
}
func TestRollupTemplateUsageStats(t *testing.T) {
t.Parallel()
db, ps := dbtestutil.NewDB(t, dbtestutil.WithDumpOnFailure())
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
anHourAgo := dbtime.Now().Add(-time.Hour).Truncate(time.Hour)
anHourAndSixMonthsAgo := anHourAgo.AddDate(0, -6, 0)
var (
org = dbgen.Organization(t, db, database.Organization{})
user = dbgen.User(t, db, database.User{Name: "user1"})
tpl = dbgen.Template(t, db, database.Template{OrganizationID: org.ID, CreatedBy: user.ID})
ver = dbgen.TemplateVersion(t, db, database.TemplateVersion{OrganizationID: org.ID, TemplateID: uuid.NullUUID{UUID: tpl.ID, Valid: true}, CreatedBy: user.ID})
ws = dbgen.Workspace(t, db, database.Workspace{OrganizationID: org.ID, TemplateID: tpl.ID, OwnerID: user.ID})
job = dbgen.ProvisionerJob(t, db, ps, database.ProvisionerJob{OrganizationID: org.ID})
build = dbgen.WorkspaceBuild(t, db, database.WorkspaceBuild{WorkspaceID: ws.ID, JobID: job.ID, TemplateVersionID: ver.ID})
res = dbgen.WorkspaceResource(t, db, database.WorkspaceResource{JobID: build.JobID})
agent = dbgen.WorkspaceAgent(t, db, database.WorkspaceAgent{ResourceID: res.ID})
app = dbgen.WorkspaceApp(t, db, database.WorkspaceApp{AgentID: agent.ID})
)
// Stats inserted 6 months + 1 day ago, should be excluded.
_ = dbgen.WorkspaceAgentStat(t, db, database.WorkspaceAgentStat{
TemplateID: tpl.ID,
WorkspaceID: ws.ID,
AgentID: agent.ID,
UserID: user.ID,
CreatedAt: anHourAndSixMonthsAgo.AddDate(0, 0, -1),
ConnectionMedianLatencyMS: 1,
ConnectionCount: 1,
SessionCountSSH: 1,
})
_ = dbgen.WorkspaceAppStat(t, db, database.WorkspaceAppStat{
UserID: user.ID,
WorkspaceID: ws.ID,
AgentID: agent.ID,
SessionStartedAt: anHourAndSixMonthsAgo.AddDate(0, 0, -1),
SessionEndedAt: anHourAndSixMonthsAgo.AddDate(0, 0, -1).Add(time.Minute),
SlugOrPort: app.Slug,
})
// Stats inserted 6 months - 1 day ago, should be rolled up.
wags1 := dbgen.WorkspaceAgentStat(t, db, database.WorkspaceAgentStat{
TemplateID: tpl.ID,
WorkspaceID: ws.ID,
AgentID: agent.ID,
UserID: user.ID,
CreatedAt: anHourAndSixMonthsAgo.AddDate(0, 0, 1),
ConnectionMedianLatencyMS: 1,
ConnectionCount: 1,
SessionCountReconnectingPTY: 1,
})
wags2 := dbgen.WorkspaceAgentStat(t, db, database.WorkspaceAgentStat{
TemplateID: tpl.ID,
WorkspaceID: ws.ID,
AgentID: agent.ID,
UserID: user.ID,
CreatedAt: wags1.CreatedAt.Add(time.Minute),
ConnectionMedianLatencyMS: 1,
ConnectionCount: 1,
SessionCountReconnectingPTY: 1,
})
// wags2 and waps1 overlap, so total usage is 4 - 1.
waps1 := dbgen.WorkspaceAppStat(t, db, database.WorkspaceAppStat{
UserID: user.ID,
WorkspaceID: ws.ID,
AgentID: agent.ID,
SessionStartedAt: wags2.CreatedAt,
SessionEndedAt: wags2.CreatedAt.Add(time.Minute),
SlugOrPort: app.Slug,
})
waps2 := dbgen.WorkspaceAppStat(t, db, database.WorkspaceAppStat{
UserID: user.ID,
WorkspaceID: ws.ID,
AgentID: agent.ID,
SessionStartedAt: waps1.SessionEndedAt,
SessionEndedAt: waps1.SessionEndedAt.Add(time.Minute),
SlugOrPort: app.Slug,
})
_ = waps2 // Keep the name for documentation.
// The data is already present, so we can rely on initial rollup to occur.
events := make(chan dbrollup.Event, 1)
rolluper := dbrollup.New(logger, db, dbrollup.WithInterval(250*time.Millisecond), dbrollup.WithEventChannel(events))
defer rolluper.Close()
ctx := testutil.Context(t, testutil.WaitMedium)
select {
case <-ctx.Done():
t.Fatal("timed out waiting for rollup to occur")
case ev := <-events:
require.True(t, ev.TemplateUsageStats, "expected template usage stats to be rolled up")
}
stats, err := db.GetTemplateUsageStats(ctx, database.GetTemplateUsageStatsParams{
StartTime: anHourAndSixMonthsAgo.Add(-time.Minute),
EndTime: anHourAgo,
})
require.NoError(t, err)
require.Len(t, stats, 1)
require.Equal(t, database.TemplateUsageStat{
TemplateID: tpl.ID,
UserID: user.ID,
StartTime: wags1.CreatedAt,
EndTime: wags1.CreatedAt.Add(30 * time.Minute),
MedianLatencyMs: sql.NullFloat64{Float64: 1, Valid: true},
UsageMins: 3,
ReconnectingPtyMins: 2,
AppUsageMins: database.StringMapOfInt{
app.Slug: 2,
},
}, stats[0])
}

View File

@ -6,10 +6,9 @@ import "hash/fnv"
// change. If locks are deprecated, they should be kept in this list to avoid
// reusing the same ID.
const (
// Keep the unused iota here so we don't need + 1 every time
lockIDUnused = iota
LockIDDeploymentSetup
LockIDDeploymentSetup = iota + 1
LockIDEnterpriseDeploymentSetup
LockIDDBRollup
)
// GenLockID generates a unique and consistent lock ID from a given string.