coder/coderd/database/dbrollup/dbrollup.go

184 lines
3.9 KiB
Go

package dbrollup
import (
"context"
"flag"
"time"
"golang.org/x/sync/errgroup"
"cdr.dev/slog"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
)
const (
// DefaultInterval is the default time between rollups.
// Rollups will be synchronized with the clock so that
// they happen 13:00, 13:05, 13:10, etc.
DefaultInterval = 5 * time.Minute
)
type Event struct {
Init bool `json:"-"`
TemplateUsageStats bool `json:"template_usage_stats"`
}
type Rolluper struct {
cancel context.CancelFunc
closed chan struct{}
db database.Store
logger slog.Logger
interval time.Duration
event chan<- Event
}
type Option func(*Rolluper)
// WithInterval sets the interval between rollups.
func WithInterval(interval time.Duration) Option {
return func(r *Rolluper) {
r.interval = interval
}
}
// WithEventChannel sets the event channel to use for rollup events.
//
// This is only used for testing.
func WithEventChannel(ch chan<- Event) Option {
if flag.Lookup("test.v") == nil {
panic("developer error: WithEventChannel is not to be used outside of tests")
}
return func(r *Rolluper) {
r.event = ch
}
}
// New creates a new DB rollup service that periodically runs rollup queries.
// It is the caller's responsibility to call Close on the returned instance.
//
// This is for e.g. generating insights data (template_usage_stats) from
// raw data (workspace_agent_stats, workspace_app_stats).
func New(logger slog.Logger, db database.Store, opts ...Option) *Rolluper {
ctx, cancel := context.WithCancel(context.Background())
r := &Rolluper{
cancel: cancel,
closed: make(chan struct{}),
db: db,
logger: logger,
interval: DefaultInterval,
}
for _, opt := range opts {
opt(r)
}
//nolint:gocritic // The system rolls up database tables without user input.
ctx = dbauthz.AsSystemRestricted(ctx)
go r.start(ctx)
return r
}
func (r *Rolluper) start(ctx context.Context) {
defer close(r.closed)
do := func() {
var eg errgroup.Group
r.logger.Debug(ctx, "rolling up data")
now := time.Now()
// Track whether or not we performed a rollup (we got the advisory lock).
var ev Event
eg.Go(func() error {
return r.db.InTx(func(tx database.Store) error {
// Acquire a lock to ensure that only one instance of
// the rollup is running at a time.
ok, err := tx.TryAcquireLock(ctx, database.LockIDDBRollup)
if err != nil {
return err
}
if !ok {
return nil
}
ev.TemplateUsageStats = true
return tx.UpsertTemplateUsageStats(ctx)
}, nil)
})
err := eg.Wait()
if err != nil {
if database.IsQueryCanceledError(err) {
return
}
// Only log if Close hasn't been called.
if ctx.Err() == nil {
r.logger.Error(ctx, "failed to rollup data", slog.Error(err))
}
return
}
r.logger.Debug(ctx,
"rolled up data",
slog.F("took", time.Since(now)),
slog.F("event", ev),
)
// For testing.
if r.event != nil {
select {
case <-ctx.Done():
return
case r.event <- ev:
}
}
}
// For testing.
if r.event != nil {
select {
case <-ctx.Done():
return
case r.event <- Event{Init: true}:
}
}
// Perform do immediately and on every tick of the ticker,
// disregarding the execution time of do. This ensure that
// the rollup is performed every interval assuming do does
// not take longer than the interval to execute.
t := time.NewTicker(time.Microsecond)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
// Ensure we're on the interval.
now := time.Now()
next := now.Add(r.interval).Truncate(r.interval) // Ensure we're on the interval and synced with the clock.
d := next.Sub(now)
// Safety check (shouldn't be possible).
if d <= 0 {
d = r.interval
}
t.Reset(d)
do()
r.logger.Debug(ctx, "next rollup at", slog.F("next", next))
}
}
}
func (r *Rolluper) Close() error {
r.cancel()
<-r.closed
return nil
}