coder/coderd/agentapi/stats.go

148 lines
4.7 KiB
Go

package agentapi
import (
"context"
"sync/atomic"
"time"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"google.golang.org/protobuf/types/known/durationpb"
"github.com/google/uuid"
"cdr.dev/slog"
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/prometheusmetrics"
"github.com/coder/coder/v2/coderd/schedule"
"github.com/coder/coder/v2/codersdk"
)
type StatsBatcher interface {
Add(now time.Time, agentID uuid.UUID, templateID uuid.UUID, userID uuid.UUID, workspaceID uuid.UUID, st *agentproto.Stats) error
}
type StatsAPI struct {
AgentFn func(context.Context) (database.WorkspaceAgent, error)
Database database.Store
Pubsub pubsub.Pubsub
Log slog.Logger
StatsBatcher StatsBatcher
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
AgentStatsRefreshInterval time.Duration
UpdateAgentMetricsFn func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric)
TimeNowFn func() time.Time // defaults to dbtime.Now()
}
func (a *StatsAPI) now() time.Time {
if a.TimeNowFn != nil {
return a.TimeNowFn()
}
return dbtime.Now()
}
func (a *StatsAPI) UpdateStats(ctx context.Context, req *agentproto.UpdateStatsRequest) (*agentproto.UpdateStatsResponse, error) {
res := &agentproto.UpdateStatsResponse{
ReportInterval: durationpb.New(a.AgentStatsRefreshInterval),
}
// An empty stat means it's just looking for the report interval.
if req.Stats == nil {
return res, nil
}
workspaceAgent, err := a.AgentFn(ctx)
if err != nil {
return nil, err
}
getWorkspaceAgentByIDRow, err := a.Database.GetWorkspaceByAgentID(ctx, workspaceAgent.ID)
if err != nil {
return nil, xerrors.Errorf("get workspace by agent ID %q: %w", workspaceAgent.ID, err)
}
workspace := getWorkspaceAgentByIDRow.Workspace
a.Log.Debug(ctx, "read stats report",
slog.F("interval", a.AgentStatsRefreshInterval),
slog.F("workspace_id", workspace.ID),
slog.F("payload", req),
)
now := a.now()
if req.Stats.ConnectionCount > 0 {
var nextAutostart time.Time
if workspace.AutostartSchedule.String != "" {
templateSchedule, err := (*(a.TemplateScheduleStore.Load())).Get(ctx, a.Database, workspace.TemplateID)
// If the template schedule fails to load, just default to bumping
// without the next transition and log it.
if err != nil {
a.Log.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min",
slog.F("workspace_id", workspace.ID),
slog.F("template_id", workspace.TemplateID),
slog.Error(err),
)
} else {
next, allowed := schedule.NextAutostart(now, workspace.AutostartSchedule.String, templateSchedule)
if allowed {
nextAutostart = next
}
}
}
ActivityBumpWorkspace(ctx, a.Log.Named("activity_bump"), a.Database, workspace.ID, nextAutostart)
}
var errGroup errgroup.Group
errGroup.Go(func() error {
err := a.StatsBatcher.Add(now, workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, req.Stats)
if err != nil {
a.Log.Error(ctx, "add agent stats to batcher", slog.Error(err))
return xerrors.Errorf("insert workspace agent stats batch: %w", err)
}
return nil
})
errGroup.Go(func() error {
err := a.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{
ID: workspace.ID,
LastUsedAt: now,
})
if err != nil {
return xerrors.Errorf("update workspace LastUsedAt: %w", err)
}
return nil
})
if a.UpdateAgentMetricsFn != nil {
errGroup.Go(func() error {
user, err := a.Database.GetUserByID(ctx, workspace.OwnerID)
if err != nil {
return xerrors.Errorf("get user: %w", err)
}
a.UpdateAgentMetricsFn(ctx, prometheusmetrics.AgentMetricLabels{
Username: user.Username,
WorkspaceName: workspace.Name,
AgentName: workspaceAgent.Name,
TemplateName: getWorkspaceAgentByIDRow.TemplateName,
}, req.Stats.Metrics)
return nil
})
}
err = errGroup.Wait()
if err != nil {
return nil, xerrors.Errorf("update stats in database: %w", err)
}
// Tell the frontend about the new agent report, now that everything is updated
a.publishWorkspaceAgentStats(ctx, workspace.ID)
return res, nil
}
func (a *StatsAPI) publishWorkspaceAgentStats(ctx context.Context, workspaceID uuid.UUID) {
err := a.Pubsub.Publish(codersdk.WorkspaceNotifyChannel(workspaceID), []byte{})
if err != nil {
a.Log.Warn(ctx, "failed to publish workspace agent stats",
slog.F("workspace_id", workspaceID), slog.Error(err))
}
}