feat(coderd): batch agent stats inserts (#8875)

This PR adds support for batching inserts to the workspace_agents_stats table.
Up to 1024 stats are batched, and flushed every second in a batch.
This commit is contained in:
Cian Johnston 2023-08-04 17:00:42 +01:00 committed by GitHub
parent ae88b79fd7
commit 9fb18f3ae5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 785 additions and 35 deletions

View File

@ -63,6 +63,7 @@ import (
"github.com/coder/coder/cli/config"
"github.com/coder/coder/coderd"
"github.com/coder/coder/coderd/autobuild"
"github.com/coder/coder/coderd/batchstats"
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/database/dbfake"
"github.com/coder/coder/coderd/database/dbmetrics"
@ -813,6 +814,16 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
options.SwaggerEndpoint = cfg.Swagger.Enable.Value()
}
batcher, closeBatcher, err := batchstats.New(ctx,
batchstats.WithLogger(options.Logger.Named("batchstats")),
batchstats.WithStore(options.Database),
)
if err != nil {
return xerrors.Errorf("failed to create agent stats batcher: %w", err)
}
options.StatsBatcher = batcher
defer closeBatcher()
closeCheckInactiveUsersFunc := dormancy.CheckInactiveUsers(ctx, logger, options.Database)
defer closeCheckInactiveUsersFunc()

View File

@ -0,0 +1,289 @@
package batchstats
import (
"context"
"encoding/json"
"os"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
"golang.org/x/xerrors"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/sloghuman"
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/database/dbauthz"
"github.com/coder/coder/codersdk/agentsdk"
)
const (
defaultBufferSize = 1024
defaultFlushInterval = time.Second
)
// Batcher holds a buffer of agent stats and periodically flushes them to
// its configured store. It also updates the workspace's last used time.
type Batcher struct {
store database.Store
log slog.Logger
mu sync.Mutex
// TODO: make this a buffered chan instead?
buf *database.InsertWorkspaceAgentStatsParams
// NOTE: we batch this separately as it's a jsonb field and
// pq.Array + unnest doesn't play nicely with this.
connectionsByProto []map[string]int64
batchSize int
// tickCh is used to periodically flush the buffer.
tickCh <-chan time.Time
ticker *time.Ticker
interval time.Duration
// flushLever is used to signal the flusher to flush the buffer immediately.
flushLever chan struct{}
flushForced atomic.Bool
// flushed is used during testing to signal that a flush has completed.
flushed chan<- int
}
// Option is a functional option for configuring a Batcher.
type Option func(b *Batcher)
// WithStore sets the store to use for storing stats.
func WithStore(store database.Store) Option {
return func(b *Batcher) {
b.store = store
}
}
// WithBatchSize sets the number of stats to store in a batch.
func WithBatchSize(size int) Option {
return func(b *Batcher) {
b.batchSize = size
}
}
// WithInterval sets the interval for flushes.
func WithInterval(d time.Duration) Option {
return func(b *Batcher) {
b.interval = d
}
}
// WithLogger sets the logger to use for logging.
func WithLogger(log slog.Logger) Option {
return func(b *Batcher) {
b.log = log
}
}
// New creates a new Batcher and starts it.
func New(ctx context.Context, opts ...Option) (*Batcher, func(), error) {
b := &Batcher{}
b.log = slog.Make(sloghuman.Sink(os.Stderr))
b.flushLever = make(chan struct{}, 1) // Buffered so that it doesn't block.
for _, opt := range opts {
opt(b)
}
if b.store == nil {
return nil, nil, xerrors.Errorf("no store configured for batcher")
}
if b.interval == 0 {
b.interval = defaultFlushInterval
}
if b.batchSize == 0 {
b.batchSize = defaultBufferSize
}
if b.tickCh == nil {
b.ticker = time.NewTicker(b.interval)
b.tickCh = b.ticker.C
}
cancelCtx, cancelFunc := context.WithCancel(ctx)
done := make(chan struct{})
go func() {
b.run(cancelCtx)
close(done)
}()
closer := func() {
cancelFunc()
if b.ticker != nil {
b.ticker.Stop()
}
<-done
}
return b, closer, nil
}
// Add adds a stat to the batcher for the given workspace and agent.
func (b *Batcher) Add(
agentID uuid.UUID,
templateID uuid.UUID,
userID uuid.UUID,
workspaceID uuid.UUID,
st agentsdk.Stats,
) error {
b.mu.Lock()
defer b.mu.Unlock()
now := database.Now()
b.buf.ID = append(b.buf.ID, uuid.New())
b.buf.CreatedAt = append(b.buf.CreatedAt, now)
b.buf.AgentID = append(b.buf.AgentID, agentID)
b.buf.UserID = append(b.buf.UserID, userID)
b.buf.TemplateID = append(b.buf.TemplateID, templateID)
b.buf.WorkspaceID = append(b.buf.WorkspaceID, workspaceID)
// Store the connections by proto separately as it's a jsonb field. We marshal on flush.
// b.buf.ConnectionsByProto = append(b.buf.ConnectionsByProto, st.ConnectionsByProto)
b.connectionsByProto = append(b.connectionsByProto, st.ConnectionsByProto)
b.buf.ConnectionCount = append(b.buf.ConnectionCount, st.ConnectionCount)
b.buf.RxPackets = append(b.buf.RxPackets, st.RxPackets)
b.buf.RxBytes = append(b.buf.RxBytes, st.RxBytes)
b.buf.TxPackets = append(b.buf.TxPackets, st.TxPackets)
b.buf.TxBytes = append(b.buf.TxBytes, st.TxBytes)
b.buf.SessionCountVSCode = append(b.buf.SessionCountVSCode, st.SessionCountVSCode)
b.buf.SessionCountJetBrains = append(b.buf.SessionCountJetBrains, st.SessionCountJetBrains)
b.buf.SessionCountReconnectingPTY = append(b.buf.SessionCountReconnectingPTY, st.SessionCountReconnectingPTY)
b.buf.SessionCountSSH = append(b.buf.SessionCountSSH, st.SessionCountSSH)
b.buf.ConnectionMedianLatencyMS = append(b.buf.ConnectionMedianLatencyMS, st.ConnectionMedianLatencyMS)
// If the buffer is over 80% full, signal the flusher to flush immediately.
// We want to trigger flushes early to reduce the likelihood of
// accidentally growing the buffer over batchSize.
filled := float64(len(b.buf.ID)) / float64(b.batchSize)
if filled >= 0.8 && !b.flushForced.Load() {
b.flushLever <- struct{}{}
b.flushForced.Store(true)
}
return nil
}
// Run runs the batcher.
func (b *Batcher) run(ctx context.Context) {
b.initBuf(b.batchSize)
// nolint:gocritic // This is only ever used for one thing - inserting agent stats.
authCtx := dbauthz.AsSystemRestricted(ctx)
for {
select {
case <-b.tickCh:
b.flush(authCtx, false, "scheduled")
case <-b.flushLever:
// If the flush lever is depressed, flush the buffer immediately.
b.flush(authCtx, true, "reaching capacity")
case <-ctx.Done():
b.log.Warn(ctx, "context done, flushing before exit")
b.flush(authCtx, true, "exit")
return
}
}
}
// flush flushes the batcher's buffer.
func (b *Batcher) flush(ctx context.Context, forced bool, reason string) {
b.mu.Lock()
b.flushForced.Store(true)
start := time.Now()
count := len(b.buf.ID)
defer func() {
b.flushForced.Store(false)
b.mu.Unlock()
// Notify that a flush has completed. This only happens in tests.
if b.flushed != nil {
select {
case <-ctx.Done():
close(b.flushed)
default:
b.flushed <- count
}
}
if count > 0 {
elapsed := time.Since(start)
b.log.Debug(ctx, "flush complete",
slog.F("count", count),
slog.F("elapsed", elapsed),
slog.F("forced", forced),
slog.F("reason", reason),
)
}
}()
if len(b.buf.ID) == 0 {
return
}
// marshal connections by proto
payload, err := json.Marshal(b.connectionsByProto)
if err != nil {
b.log.Error(ctx, "unable to marshal agent connections by proto, dropping data", slog.Error(err))
b.buf.ConnectionsByProto = json.RawMessage(`[]`)
} else {
b.buf.ConnectionsByProto = payload
}
err = b.store.InsertWorkspaceAgentStats(ctx, *b.buf)
elapsed := time.Since(start)
if err != nil {
b.log.Error(ctx, "error inserting workspace agent stats", slog.Error(err), slog.F("elapsed", elapsed))
return
}
b.resetBuf()
}
// initBuf resets the buffer. b MUST be locked.
func (b *Batcher) initBuf(size int) {
b.buf = &database.InsertWorkspaceAgentStatsParams{
ID: make([]uuid.UUID, 0, b.batchSize),
CreatedAt: make([]time.Time, 0, b.batchSize),
UserID: make([]uuid.UUID, 0, b.batchSize),
WorkspaceID: make([]uuid.UUID, 0, b.batchSize),
TemplateID: make([]uuid.UUID, 0, b.batchSize),
AgentID: make([]uuid.UUID, 0, b.batchSize),
ConnectionsByProto: json.RawMessage("[]"),
ConnectionCount: make([]int64, 0, b.batchSize),
RxPackets: make([]int64, 0, b.batchSize),
RxBytes: make([]int64, 0, b.batchSize),
TxPackets: make([]int64, 0, b.batchSize),
TxBytes: make([]int64, 0, b.batchSize),
SessionCountVSCode: make([]int64, 0, b.batchSize),
SessionCountJetBrains: make([]int64, 0, b.batchSize),
SessionCountReconnectingPTY: make([]int64, 0, b.batchSize),
SessionCountSSH: make([]int64, 0, b.batchSize),
ConnectionMedianLatencyMS: make([]float64, 0, b.batchSize),
}
b.connectionsByProto = make([]map[string]int64, 0, size)
}
func (b *Batcher) resetBuf() {
b.buf.ID = b.buf.ID[:0]
b.buf.CreatedAt = b.buf.CreatedAt[:0]
b.buf.UserID = b.buf.UserID[:0]
b.buf.WorkspaceID = b.buf.WorkspaceID[:0]
b.buf.TemplateID = b.buf.TemplateID[:0]
b.buf.AgentID = b.buf.AgentID[:0]
b.buf.ConnectionsByProto = json.RawMessage(`[]`)
b.buf.ConnectionCount = b.buf.ConnectionCount[:0]
b.buf.RxPackets = b.buf.RxPackets[:0]
b.buf.RxBytes = b.buf.RxBytes[:0]
b.buf.TxPackets = b.buf.TxPackets[:0]
b.buf.TxBytes = b.buf.TxBytes[:0]
b.buf.SessionCountVSCode = b.buf.SessionCountVSCode[:0]
b.buf.SessionCountJetBrains = b.buf.SessionCountJetBrains[:0]
b.buf.SessionCountReconnectingPTY = b.buf.SessionCountReconnectingPTY[:0]
b.buf.SessionCountSSH = b.buf.SessionCountSSH[:0]
b.buf.ConnectionMedianLatencyMS = b.buf.ConnectionMedianLatencyMS[:0]
b.connectionsByProto = b.connectionsByProto[:0]
}

View File

@ -0,0 +1,226 @@
package batchstats
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/database/dbgen"
"github.com/coder/coder/coderd/database/dbtestutil"
"github.com/coder/coder/coderd/rbac"
"github.com/coder/coder/codersdk/agentsdk"
"github.com/coder/coder/cryptorand"
)
func TestBatchStats(t *testing.T) {
t.Parallel()
// Given: a fresh batcher with no data
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
log := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
store, _ := dbtestutil.NewDB(t)
// Set up some test dependencies.
deps1 := setupDeps(t, store)
deps2 := setupDeps(t, store)
tick := make(chan time.Time)
flushed := make(chan int)
b, closer, err := New(ctx,
WithStore(store),
WithLogger(log),
func(b *Batcher) {
b.tickCh = tick
b.flushed = flushed
},
)
require.NoError(t, err)
t.Cleanup(closer)
// Given: no data points are added for workspace
// When: it becomes time to report stats
t1 := time.Now()
// Signal a tick and wait for a flush to complete.
tick <- t1
f := <-flushed
require.Equal(t, 0, f, "expected no data to be flushed")
t.Logf("flush 1 completed")
// Then: it should report no stats.
stats, err := store.GetWorkspaceAgentStats(ctx, t1)
require.NoError(t, err, "should not error getting stats")
require.Empty(t, stats, "should have no stats for workspace")
// Given: a single data point is added for workspace
t2 := time.Now()
t.Logf("inserting 1 stat")
require.NoError(t, b.Add(deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t)))
// When: it becomes time to report stats
// Signal a tick and wait for a flush to complete.
tick <- t2
f = <-flushed // Wait for a flush to complete.
require.Equal(t, 1, f, "expected one stat to be flushed")
t.Logf("flush 2 completed")
// Then: it should report a single stat.
stats, err = store.GetWorkspaceAgentStats(ctx, t2)
require.NoError(t, err, "should not error getting stats")
require.Len(t, stats, 1, "should have stats for workspace")
// Given: a lot of data points are added for both workspaces
// (equal to batch size)
t3 := time.Now()
done := make(chan struct{})
go func() {
defer close(done)
t.Logf("inserting %d stats", defaultBufferSize)
for i := 0; i < defaultBufferSize; i++ {
if i%2 == 0 {
require.NoError(t, b.Add(deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t)))
} else {
require.NoError(t, b.Add(deps2.Agent.ID, deps2.User.ID, deps2.Template.ID, deps2.Workspace.ID, randAgentSDKStats(t)))
}
}
}()
// When: the buffer comes close to capacity
// Then: The buffer will force-flush once.
f = <-flushed
t.Logf("flush 3 completed")
require.Greater(t, f, 819, "expected at least 819 stats to be flushed (>=80% of buffer)")
// And we should finish inserting the stats
<-done
stats, err = store.GetWorkspaceAgentStats(ctx, t3)
require.NoError(t, err, "should not error getting stats")
require.Len(t, stats, 2, "should have stats for both workspaces")
// Ensures that a subsequent flush pushes all the remaining data
t4 := time.Now()
tick <- t4
f2 := <-flushed
t.Logf("flush 4 completed")
expectedCount := defaultBufferSize - f
require.Equal(t, expectedCount, f2, "did not flush expected remaining rows")
// Ensure that a subsequent flush does not push stale data.
t5 := time.Now()
tick <- t5
f = <-flushed
require.Zero(t, f, "expected zero stats to have been flushed")
t.Logf("flush 5 completed")
stats, err = store.GetWorkspaceAgentStats(ctx, t5)
require.NoError(t, err, "should not error getting stats")
require.Len(t, stats, 0, "should have no stats for workspace")
// Ensure that buf never grew beyond what we expect
require.Equal(t, defaultBufferSize, cap(b.buf.ID), "buffer grew beyond expected capacity")
}
// randAgentSDKStats returns a random agentsdk.Stats
func randAgentSDKStats(t *testing.T, opts ...func(*agentsdk.Stats)) agentsdk.Stats {
t.Helper()
s := agentsdk.Stats{
ConnectionsByProto: map[string]int64{
"ssh": mustRandInt64n(t, 9) + 1,
"vscode": mustRandInt64n(t, 9) + 1,
"jetbrains": mustRandInt64n(t, 9) + 1,
"reconnecting_pty": mustRandInt64n(t, 9) + 1,
},
ConnectionCount: mustRandInt64n(t, 99) + 1,
ConnectionMedianLatencyMS: float64(mustRandInt64n(t, 99) + 1),
RxPackets: mustRandInt64n(t, 99) + 1,
RxBytes: mustRandInt64n(t, 99) + 1,
TxPackets: mustRandInt64n(t, 99) + 1,
TxBytes: mustRandInt64n(t, 99) + 1,
SessionCountVSCode: mustRandInt64n(t, 9) + 1,
SessionCountJetBrains: mustRandInt64n(t, 9) + 1,
SessionCountReconnectingPTY: mustRandInt64n(t, 9) + 1,
SessionCountSSH: mustRandInt64n(t, 9) + 1,
Metrics: []agentsdk.AgentMetric{},
}
for _, opt := range opts {
opt(&s)
}
return s
}
// deps is a set of test dependencies.
type deps struct {
Agent database.WorkspaceAgent
Template database.Template
User database.User
Workspace database.Workspace
}
// setupDeps sets up a set of test dependencies.
// It creates an organization, user, template, workspace, and agent
// along with all the other miscellaneous plumbing required to link
// them together.
func setupDeps(t *testing.T, store database.Store) deps {
t.Helper()
org := dbgen.Organization(t, store, database.Organization{})
user := dbgen.User(t, store, database.User{})
_, err := store.InsertOrganizationMember(context.Background(), database.InsertOrganizationMemberParams{
OrganizationID: org.ID,
UserID: user.ID,
Roles: []string{rbac.RoleOrgMember(org.ID)},
})
require.NoError(t, err)
tv := dbgen.TemplateVersion(t, store, database.TemplateVersion{
OrganizationID: org.ID,
CreatedBy: user.ID,
})
tpl := dbgen.Template(t, store, database.Template{
CreatedBy: user.ID,
OrganizationID: org.ID,
ActiveVersionID: tv.ID,
})
ws := dbgen.Workspace(t, store, database.Workspace{
TemplateID: tpl.ID,
OwnerID: user.ID,
OrganizationID: org.ID,
LastUsedAt: time.Now().Add(-time.Hour),
})
pj := dbgen.ProvisionerJob(t, store, database.ProvisionerJob{
InitiatorID: user.ID,
OrganizationID: org.ID,
})
_ = dbgen.WorkspaceBuild(t, store, database.WorkspaceBuild{
TemplateVersionID: tv.ID,
WorkspaceID: ws.ID,
JobID: pj.ID,
})
res := dbgen.WorkspaceResource(t, store, database.WorkspaceResource{
Transition: database.WorkspaceTransitionStart,
JobID: pj.ID,
})
agt := dbgen.WorkspaceAgent(t, store, database.WorkspaceAgent{
ResourceID: res.ID,
})
return deps{
Agent: agt,
Template: tpl,
User: user,
Workspace: ws,
}
}
// mustRandInt64n returns a random int64 in the range [0, n).
func mustRandInt64n(t *testing.T, n int64) int64 {
t.Helper()
i, err := cryptorand.Intn(int(n))
require.NoError(t, err)
return int64(i)
}

View File

@ -43,6 +43,7 @@ import (
"github.com/coder/coder/buildinfo"
"github.com/coder/coder/coderd/audit"
"github.com/coder/coder/coderd/awsidentity"
"github.com/coder/coder/coderd/batchstats"
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/database/dbauthz"
"github.com/coder/coder/coderd/database/pubsub"
@ -160,6 +161,7 @@ type Options struct {
HTTPClient *http.Client
UpdateAgentMetrics func(ctx context.Context, username, workspaceName, agentName string, metrics []agentsdk.AgentMetric)
StatsBatcher *batchstats.Batcher
}
// @title Coder API
@ -180,6 +182,8 @@ type Options struct {
// @in header
// @name Coder-Session-Token
// New constructs a Coder API handler.
//
//nolint:gocyclo
func New(options *Options) *API {
if options == nil {
options = &Options{}
@ -288,6 +292,10 @@ func New(options *Options) *API {
options.UserQuietHoursScheduleStore.Store(&v)
}
if options.StatsBatcher == nil {
panic("developer error: options.StatsBatcher is nil")
}
siteCacheDir := options.CacheDir
if siteCacheDir != "" {
siteCacheDir = filepath.Join(siteCacheDir, "site")
@ -462,6 +470,8 @@ func New(options *Options) *API {
cors := httpmw.Cors(options.DeploymentValues.Dangerous.AllowAllCors.Value())
prometheusMW := httpmw.Prometheus(options.PrometheusRegistry)
api.statsBatcher = options.StatsBatcher
r.Use(
httpmw.Recover(api.Logger),
tracing.StatusWriterMiddleware,
@ -994,6 +1004,8 @@ type API struct {
healthCheckGroup *singleflight.Group[string, *healthcheck.Report]
healthCheckCache atomic.Pointer[healthcheck.Report]
statsBatcher *batchstats.Batcher
}
// Close waits for all WebSocket connections to drain before returning.

View File

@ -51,11 +51,13 @@ import (
"tailscale.com/types/nettype"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/sloghuman"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/coderd"
"github.com/coder/coder/coderd/audit"
"github.com/coder/coder/coderd/autobuild"
"github.com/coder/coder/coderd/awsidentity"
"github.com/coder/coder/coderd/batchstats"
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/database/dbauthz"
"github.com/coder/coder/coderd/database/dbtestutil"
@ -140,7 +142,8 @@ type Options struct {
SwaggerEndpoint bool
// Logger should only be overridden if you expect errors
// as part of your test.
Logger *slog.Logger
Logger *slog.Logger
StatsBatcher *batchstats.Batcher
}
// New constructs a codersdk client connected to an in-memory API instance.
@ -241,6 +244,18 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
if options.FilesRateLimit == 0 {
options.FilesRateLimit = -1
}
if options.StatsBatcher == nil {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
batcher, closeBatcher, err := batchstats.New(ctx,
batchstats.WithStore(options.Database),
// Avoid cluttering up test output.
batchstats.WithLogger(slog.Make(sloghuman.Sink(io.Discard))),
)
require.NoError(t, err, "create stats batcher")
options.StatsBatcher = batcher
t.Cleanup(closeBatcher)
}
var templateScheduleStore atomic.Pointer[schedule.TemplateScheduleStore]
if options.TemplateScheduleStore == nil {
@ -409,6 +424,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
HealthcheckFunc: options.HealthcheckFunc,
HealthcheckTimeout: options.HealthcheckTimeout,
HealthcheckRefresh: options.HealthcheckRefresh,
StatsBatcher: options.StatsBatcher,
}
}

View File

@ -2016,6 +2016,14 @@ func (q *querier) InsertWorkspaceAgentStat(ctx context.Context, arg database.Ins
return q.db.InsertWorkspaceAgentStat(ctx, arg)
}
func (q *querier) InsertWorkspaceAgentStats(ctx context.Context, arg database.InsertWorkspaceAgentStatsParams) error {
if err := q.authorizeContext(ctx, rbac.ActionCreate, rbac.ResourceSystem); err != nil {
return err
}
return q.db.InsertWorkspaceAgentStats(ctx, arg)
}
func (q *querier) InsertWorkspaceApp(ctx context.Context, arg database.InsertWorkspaceAppParams) (database.WorkspaceApp, error) {
if err := q.authorizeContext(ctx, rbac.ActionCreate, rbac.ResourceSystem); err != nil {
return database.WorkspaceApp{}, err

View File

@ -2810,8 +2810,12 @@ func (q *FakeQuerier) GetWorkspaceAgentStats(_ context.Context, createdAfter tim
}
statByAgent := map[uuid.UUID]database.GetWorkspaceAgentStatsRow{}
for _, agentStat := range latestAgentStats {
stat := statByAgent[agentStat.AgentID]
for agentID, agentStat := range latestAgentStats {
stat := statByAgent[agentID]
stat.AgentID = agentStat.AgentID
stat.TemplateID = agentStat.TemplateID
stat.UserID = agentStat.UserID
stat.WorkspaceID = agentStat.WorkspaceID
stat.SessionCountVSCode += agentStat.SessionCountVSCode
stat.SessionCountJetBrains += agentStat.SessionCountJetBrains
stat.SessionCountReconnectingPTY += agentStat.SessionCountReconnectingPTY
@ -4177,6 +4181,49 @@ func (q *FakeQuerier) InsertWorkspaceAgentStat(_ context.Context, p database.Ins
return stat, nil
}
func (q *FakeQuerier) InsertWorkspaceAgentStats(_ context.Context, arg database.InsertWorkspaceAgentStatsParams) error {
err := validateDatabaseType(arg)
if err != nil {
return err
}
q.mutex.Lock()
defer q.mutex.Unlock()
var connectionsByProto []map[string]int64
if err := json.Unmarshal(arg.ConnectionsByProto, &connectionsByProto); err != nil {
return err
}
for i := 0; i < len(arg.ID); i++ {
cbp, err := json.Marshal(connectionsByProto[i])
if err != nil {
return xerrors.Errorf("failed to marshal connections_by_proto: %w", err)
}
stat := database.WorkspaceAgentStat{
ID: arg.ID[i],
CreatedAt: arg.CreatedAt[i],
WorkspaceID: arg.WorkspaceID[i],
AgentID: arg.AgentID[i],
UserID: arg.UserID[i],
ConnectionsByProto: cbp,
ConnectionCount: arg.ConnectionCount[i],
RxPackets: arg.RxPackets[i],
RxBytes: arg.RxBytes[i],
TxPackets: arg.TxPackets[i],
TxBytes: arg.TxBytes[i],
TemplateID: arg.TemplateID[i],
SessionCountVSCode: arg.SessionCountVSCode[i],
SessionCountJetBrains: arg.SessionCountJetBrains[i],
SessionCountReconnectingPTY: arg.SessionCountReconnectingPTY[i],
SessionCountSSH: arg.SessionCountSSH[i],
ConnectionMedianLatencyMS: arg.ConnectionMedianLatencyMS[i],
}
q.workspaceAgentStats = append(q.workspaceAgentStats, stat)
}
return nil
}
func (q *FakeQuerier) InsertWorkspaceApp(_ context.Context, arg database.InsertWorkspaceAppParams) (database.WorkspaceApp, error) {
if err := validateDatabaseType(arg); err != nil {
return database.WorkspaceApp{}, err

View File

@ -1236,6 +1236,13 @@ func (m metricsStore) InsertWorkspaceAgentStat(ctx context.Context, arg database
return stat, err
}
func (m metricsStore) InsertWorkspaceAgentStats(ctx context.Context, arg database.InsertWorkspaceAgentStatsParams) error {
start := time.Now()
r0 := m.s.InsertWorkspaceAgentStats(ctx, arg)
m.queryLatencies.WithLabelValues("InsertWorkspaceAgentStats").Observe(time.Since(start).Seconds())
return r0
}
func (m metricsStore) InsertWorkspaceApp(ctx context.Context, arg database.InsertWorkspaceAppParams) (database.WorkspaceApp, error) {
start := time.Now()
app, err := m.s.InsertWorkspaceApp(ctx, arg)

View File

@ -2598,6 +2598,20 @@ func (mr *MockStoreMockRecorder) InsertWorkspaceAgentStat(arg0, arg1 interface{}
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertWorkspaceAgentStat", reflect.TypeOf((*MockStore)(nil).InsertWorkspaceAgentStat), arg0, arg1)
}
// InsertWorkspaceAgentStats mocks base method.
func (m *MockStore) InsertWorkspaceAgentStats(arg0 context.Context, arg1 database.InsertWorkspaceAgentStatsParams) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "InsertWorkspaceAgentStats", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// InsertWorkspaceAgentStats indicates an expected call of InsertWorkspaceAgentStats.
func (mr *MockStoreMockRecorder) InsertWorkspaceAgentStats(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertWorkspaceAgentStats", reflect.TypeOf((*MockStore)(nil).InsertWorkspaceAgentStats), arg0, arg1)
}
// InsertWorkspaceApp mocks base method.
func (m *MockStore) InsertWorkspaceApp(arg0 context.Context, arg1 database.InsertWorkspaceAppParams) (database.WorkspaceApp, error) {
m.ctrl.T.Helper()

View File

@ -225,6 +225,7 @@ type sqlcQuerier interface {
InsertWorkspaceAgentLogs(ctx context.Context, arg InsertWorkspaceAgentLogsParams) ([]WorkspaceAgentLog, error)
InsertWorkspaceAgentMetadata(ctx context.Context, arg InsertWorkspaceAgentMetadataParams) error
InsertWorkspaceAgentStat(ctx context.Context, arg InsertWorkspaceAgentStatParams) (WorkspaceAgentStat, error)
InsertWorkspaceAgentStats(ctx context.Context, arg InsertWorkspaceAgentStatsParams) error
InsertWorkspaceApp(ctx context.Context, arg InsertWorkspaceAppParams) (WorkspaceApp, error)
InsertWorkspaceBuild(ctx context.Context, arg InsertWorkspaceBuildParams) error
InsertWorkspaceBuildParameters(ctx context.Context, arg InsertWorkspaceBuildParametersParams) error

View File

@ -7418,6 +7418,90 @@ func (q *sqlQuerier) InsertWorkspaceAgentStat(ctx context.Context, arg InsertWor
return i, err
}
const insertWorkspaceAgentStats = `-- name: InsertWorkspaceAgentStats :exec
INSERT INTO
workspace_agent_stats (
id,
created_at,
user_id,
workspace_id,
template_id,
agent_id,
connections_by_proto,
connection_count,
rx_packets,
rx_bytes,
tx_packets,
tx_bytes,
session_count_vscode,
session_count_jetbrains,
session_count_reconnecting_pty,
session_count_ssh,
connection_median_latency_ms
)
SELECT
unnest($1 :: uuid[]) AS id,
unnest($2 :: timestamptz[]) AS created_at,
unnest($3 :: uuid[]) AS user_id,
unnest($4 :: uuid[]) AS workspace_id,
unnest($5 :: uuid[]) AS template_id,
unnest($6 :: uuid[]) AS agent_id,
jsonb_array_elements($7 :: jsonb) AS connections_by_proto,
unnest($8 :: bigint[]) AS connection_count,
unnest($9 :: bigint[]) AS rx_packets,
unnest($10 :: bigint[]) AS rx_bytes,
unnest($11 :: bigint[]) AS tx_packets,
unnest($12 :: bigint[]) AS tx_bytes,
unnest($13 :: bigint[]) AS session_count_vscode,
unnest($14 :: bigint[]) AS session_count_jetbrains,
unnest($15 :: bigint[]) AS session_count_reconnecting_pty,
unnest($16 :: bigint[]) AS session_count_ssh,
unnest($17 :: double precision[]) AS connection_median_latency_ms
`
type InsertWorkspaceAgentStatsParams struct {
ID []uuid.UUID `db:"id" json:"id"`
CreatedAt []time.Time `db:"created_at" json:"created_at"`
UserID []uuid.UUID `db:"user_id" json:"user_id"`
WorkspaceID []uuid.UUID `db:"workspace_id" json:"workspace_id"`
TemplateID []uuid.UUID `db:"template_id" json:"template_id"`
AgentID []uuid.UUID `db:"agent_id" json:"agent_id"`
ConnectionsByProto json.RawMessage `db:"connections_by_proto" json:"connections_by_proto"`
ConnectionCount []int64 `db:"connection_count" json:"connection_count"`
RxPackets []int64 `db:"rx_packets" json:"rx_packets"`
RxBytes []int64 `db:"rx_bytes" json:"rx_bytes"`
TxPackets []int64 `db:"tx_packets" json:"tx_packets"`
TxBytes []int64 `db:"tx_bytes" json:"tx_bytes"`
SessionCountVSCode []int64 `db:"session_count_vscode" json:"session_count_vscode"`
SessionCountJetBrains []int64 `db:"session_count_jetbrains" json:"session_count_jetbrains"`
SessionCountReconnectingPTY []int64 `db:"session_count_reconnecting_pty" json:"session_count_reconnecting_pty"`
SessionCountSSH []int64 `db:"session_count_ssh" json:"session_count_ssh"`
ConnectionMedianLatencyMS []float64 `db:"connection_median_latency_ms" json:"connection_median_latency_ms"`
}
func (q *sqlQuerier) InsertWorkspaceAgentStats(ctx context.Context, arg InsertWorkspaceAgentStatsParams) error {
_, err := q.db.ExecContext(ctx, insertWorkspaceAgentStats,
pq.Array(arg.ID),
pq.Array(arg.CreatedAt),
pq.Array(arg.UserID),
pq.Array(arg.WorkspaceID),
pq.Array(arg.TemplateID),
pq.Array(arg.AgentID),
arg.ConnectionsByProto,
pq.Array(arg.ConnectionCount),
pq.Array(arg.RxPackets),
pq.Array(arg.RxBytes),
pq.Array(arg.TxPackets),
pq.Array(arg.TxBytes),
pq.Array(arg.SessionCountVSCode),
pq.Array(arg.SessionCountJetBrains),
pq.Array(arg.SessionCountReconnectingPTY),
pq.Array(arg.SessionCountSSH),
pq.Array(arg.ConnectionMedianLatencyMS),
)
return err
}
const getWorkspaceAppByAgentIDAndSlug = `-- name: GetWorkspaceAppByAgentIDAndSlug :one
SELECT id, created_at, agent_id, display_name, icon, command, url, healthcheck_url, healthcheck_interval, healthcheck_threshold, health, subdomain, sharing_level, slug, external FROM workspace_apps WHERE agent_id = $1 AND slug = $2
`

View File

@ -22,6 +22,46 @@ INSERT INTO
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) RETURNING *;
-- name: InsertWorkspaceAgentStats :exec
INSERT INTO
workspace_agent_stats (
id,
created_at,
user_id,
workspace_id,
template_id,
agent_id,
connections_by_proto,
connection_count,
rx_packets,
rx_bytes,
tx_packets,
tx_bytes,
session_count_vscode,
session_count_jetbrains,
session_count_reconnecting_pty,
session_count_ssh,
connection_median_latency_ms
)
SELECT
unnest(@id :: uuid[]) AS id,
unnest(@created_at :: timestamptz[]) AS created_at,
unnest(@user_id :: uuid[]) AS user_id,
unnest(@workspace_id :: uuid[]) AS workspace_id,
unnest(@template_id :: uuid[]) AS template_id,
unnest(@agent_id :: uuid[]) AS agent_id,
jsonb_array_elements(@connections_by_proto :: jsonb) AS connections_by_proto,
unnest(@connection_count :: bigint[]) AS connection_count,
unnest(@rx_packets :: bigint[]) AS rx_packets,
unnest(@rx_bytes :: bigint[]) AS rx_bytes,
unnest(@tx_packets :: bigint[]) AS tx_packets,
unnest(@tx_bytes :: bigint[]) AS tx_bytes,
unnest(@session_count_vscode :: bigint[]) AS session_count_vscode,
unnest(@session_count_jetbrains :: bigint[]) AS session_count_jetbrains,
unnest(@session_count_reconnecting_pty :: bigint[]) AS session_count_reconnecting_pty,
unnest(@session_count_ssh :: bigint[]) AS session_count_ssh,
unnest(@connection_median_latency_ms :: double precision[]) AS connection_median_latency_ms;
-- name: GetTemplateDAUs :many
SELECT
(created_at at TIME ZONE cast(@tz_offset::integer as text))::date as date,

View File

@ -11,6 +11,9 @@ import (
"testing"
"time"
"github.com/coder/coder/coderd/batchstats"
"github.com/coder/coder/coderd/database/dbtestutil"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
@ -372,9 +375,29 @@ func TestAgents(t *testing.T) {
func TestAgentStats(t *testing.T) {
t.Parallel()
ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)
db, pubsub := dbtestutil.NewDB(t)
log := slogtest.Make(t, nil)
batcher, closeBatcher, err := batchstats.New(ctx,
batchstats.WithStore(db),
// We want our stats, and we want them NOW.
batchstats.WithBatchSize(1),
batchstats.WithInterval(time.Hour),
batchstats.WithLogger(log),
)
require.NoError(t, err, "create stats batcher failed")
t.Cleanup(closeBatcher)
// Build sample workspaces with test agents and fake agent client
client, _, api := coderdtest.NewWithAPI(t, &coderdtest.Options{IncludeProvisionerDaemon: true})
db := api.Database
client, _, _ := coderdtest.NewWithAPI(t, &coderdtest.Options{
Database: db,
IncludeProvisionerDaemon: true,
Pubsub: pubsub,
StatsBatcher: batcher,
})
user := coderdtest.CreateFirstUser(t, client)
@ -384,11 +407,7 @@ func TestAgentStats(t *testing.T) {
registry := prometheus.NewRegistry()
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
// given
var err error
var i int64
for i = 0; i < 3; i++ {
_, err = agent1.PostStats(ctx, &agentsdk.Stats{

View File

@ -1410,36 +1410,12 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques
activityBumpWorkspace(ctx, api.Logger.Named("activity_bump"), api.Database, workspace.ID)
}
payload, err := json.Marshal(req.ConnectionsByProto)
if err != nil {
api.Logger.Error(ctx, "marshal agent connections by proto", slog.F("workspace_agent_id", workspaceAgent.ID), slog.Error(err))
payload = json.RawMessage("{}")
}
now := database.Now()
var errGroup errgroup.Group
errGroup.Go(func() error {
_, err = api.Database.InsertWorkspaceAgentStat(ctx, database.InsertWorkspaceAgentStatParams{
ID: uuid.New(),
CreatedAt: now,
AgentID: workspaceAgent.ID,
WorkspaceID: workspace.ID,
UserID: workspace.OwnerID,
TemplateID: workspace.TemplateID,
ConnectionsByProto: payload,
ConnectionCount: req.ConnectionCount,
RxPackets: req.RxPackets,
RxBytes: req.RxBytes,
TxPackets: req.TxPackets,
TxBytes: req.TxBytes,
SessionCountVSCode: req.SessionCountVSCode,
SessionCountJetBrains: req.SessionCountJetBrains,
SessionCountReconnectingPTY: req.SessionCountReconnectingPTY,
SessionCountSSH: req.SessionCountSSH,
ConnectionMedianLatencyMS: req.ConnectionMedianLatencyMS,
})
if err != nil {
if err := api.statsBatcher.Add(workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, req); err != nil {
api.Logger.Error(ctx, "failed to add stats to batcher", slog.Error(err))
return xerrors.Errorf("can't insert workspace agent stat: %w", err)
}
return nil