mirror of https://github.com/coder/coder.git
chore: convert agent stats to use a table (#6374)
* chore: convert workspace agent stats from json to table * chore: convert agent stats to use a table Backwards compatibility becomes hard when all agent stats are in a JSON blob. We also want to query this table for new agents that are failing health checks so we can display it in the UI. * Fix migration using default values
This commit is contained in:
parent
7cf1e20aac
commit
05e449943d
|
@ -767,12 +767,12 @@ func (a *agent) init(ctx context.Context) {
|
|||
|
||||
func convertAgentStats(counts map[netlogtype.Connection]netlogtype.Counts) *agentsdk.Stats {
|
||||
stats := &agentsdk.Stats{
|
||||
ConnsByProto: map[string]int64{},
|
||||
NumConns: int64(len(counts)),
|
||||
ConnectionsByProto: map[string]int64{},
|
||||
ConnectionCount: int64(len(counts)),
|
||||
}
|
||||
|
||||
for conn, count := range counts {
|
||||
stats.ConnsByProto[conn.Proto.String()]++
|
||||
stats.ConnectionsByProto[conn.Proto.String()]++
|
||||
stats.RxPackets += int64(count.RxPackets)
|
||||
stats.RxBytes += int64(count.RxBytes)
|
||||
stats.TxPackets += int64(count.TxPackets)
|
||||
|
|
|
@ -73,7 +73,7 @@ func TestAgent_Stats_SSH(t *testing.T) {
|
|||
require.Eventuallyf(t, func() bool {
|
||||
var ok bool
|
||||
s, ok = <-stats
|
||||
return ok && s.NumConns > 0 && s.RxBytes > 0 && s.TxBytes > 0
|
||||
return ok && s.ConnectionCount > 0 && s.RxBytes > 0 && s.TxBytes > 0
|
||||
}, testutil.WaitLong, testutil.IntervalFast,
|
||||
"never saw stats: %+v", s,
|
||||
)
|
||||
|
@ -102,7 +102,7 @@ func TestAgent_Stats_ReconnectingPTY(t *testing.T) {
|
|||
require.Eventuallyf(t, func() bool {
|
||||
var ok bool
|
||||
s, ok = <-stats
|
||||
return ok && s.NumConns > 0 && s.RxBytes > 0 && s.TxBytes > 0
|
||||
return ok && s.ConnectionCount > 0 && s.RxBytes > 0 && s.TxBytes > 0
|
||||
}, testutil.WaitLong, testutil.IntervalFast,
|
||||
"never saw stats: %+v", s,
|
||||
)
|
||||
|
|
|
@ -5202,14 +5202,14 @@ const docTemplate = `{
|
|||
"type": "object",
|
||||
"properties": {
|
||||
"conns_by_proto": {
|
||||
"description": "ConnsByProto is a count of connections by protocol.",
|
||||
"description": "ConnectionsByProto is a count of connections by protocol.",
|
||||
"type": "object",
|
||||
"additionalProperties": {
|
||||
"type": "integer"
|
||||
}
|
||||
},
|
||||
"num_comms": {
|
||||
"description": "NumConns is the number of connections received by an agent.",
|
||||
"description": "ConnectionCount is the number of connections received by an agent.",
|
||||
"type": "integer"
|
||||
},
|
||||
"rx_bytes": {
|
||||
|
|
|
@ -4595,14 +4595,14 @@
|
|||
"type": "object",
|
||||
"properties": {
|
||||
"conns_by_proto": {
|
||||
"description": "ConnsByProto is a count of connections by protocol.",
|
||||
"description": "ConnectionsByProto is a count of connections by protocol.",
|
||||
"type": "object",
|
||||
"additionalProperties": {
|
||||
"type": "integer"
|
||||
}
|
||||
},
|
||||
"num_comms": {
|
||||
"description": "NumConns is the number of connections received by an agent.",
|
||||
"description": "ConnectionCount is the number of connections received by an agent.",
|
||||
"type": "integer"
|
||||
},
|
||||
"rx_bytes": {
|
||||
|
|
|
@ -1413,18 +1413,18 @@ func (q *querier) UpdateWorkspaceAgentConnectionByID(ctx context.Context, arg da
|
|||
return update(q.log, q.auth, fetch, q.db.UpdateWorkspaceAgentConnectionByID)(ctx, arg)
|
||||
}
|
||||
|
||||
func (q *querier) InsertAgentStat(ctx context.Context, arg database.InsertAgentStatParams) (database.AgentStat, error) {
|
||||
func (q *querier) InsertWorkspaceAgentStat(ctx context.Context, arg database.InsertWorkspaceAgentStatParams) (database.WorkspaceAgentStat, error) {
|
||||
// TODO: This is a workspace agent operation. Should users be able to query this?
|
||||
// Not really sure what this is for.
|
||||
workspace, err := q.db.GetWorkspaceByID(ctx, arg.WorkspaceID)
|
||||
if err != nil {
|
||||
return database.AgentStat{}, err
|
||||
return database.WorkspaceAgentStat{}, err
|
||||
}
|
||||
err = q.authorizeContext(ctx, rbac.ActionUpdate, workspace)
|
||||
if err != nil {
|
||||
return database.AgentStat{}, err
|
||||
return database.WorkspaceAgentStat{}, err
|
||||
}
|
||||
return q.db.InsertAgentStat(ctx, arg)
|
||||
return q.db.InsertWorkspaceAgentStat(ctx, arg)
|
||||
}
|
||||
|
||||
func (q *querier) UpdateWorkspaceAppHealthByID(ctx context.Context, arg database.UpdateWorkspaceAppHealthByIDParams) error {
|
||||
|
|
|
@ -1170,9 +1170,9 @@ func (s *MethodTestSuite) TestWorkspace() {
|
|||
ID: agt.ID,
|
||||
}).Asserts(ws, rbac.ActionUpdate).Returns()
|
||||
}))
|
||||
s.Run("InsertAgentStat", s.Subtest(func(db database.Store, check *expects) {
|
||||
s.Run("InsertWorkspaceAgentStat", s.Subtest(func(db database.Store, check *expects) {
|
||||
ws := dbgen.Workspace(s.T(), db, database.Workspace{})
|
||||
check.Args(database.InsertAgentStatParams{
|
||||
check.Args(database.InsertWorkspaceAgentStatParams{
|
||||
WorkspaceID: ws.ID,
|
||||
}).Asserts(ws, rbac.ActionUpdate)
|
||||
}))
|
||||
|
|
|
@ -197,8 +197,8 @@ func (q *querier) GetWorkspaceResourceMetadataCreatedAfter(ctx context.Context,
|
|||
return q.db.GetWorkspaceResourceMetadataCreatedAfter(ctx, createdAt)
|
||||
}
|
||||
|
||||
func (q *querier) DeleteOldAgentStats(ctx context.Context) error {
|
||||
return q.db.DeleteOldAgentStats(ctx)
|
||||
func (q *querier) DeleteOldWorkspaceAgentStats(ctx context.Context) error {
|
||||
return q.db.DeleteOldWorkspaceAgentStats(ctx)
|
||||
}
|
||||
|
||||
func (q *querier) GetParameterSchemasCreatedAfter(ctx context.Context, createdAt time.Time) ([]database.ParameterSchema, error) {
|
||||
|
|
|
@ -128,7 +128,7 @@ func (s *MethodTestSuite) TestSystemFunctions() {
|
|||
_ = dbgen.WorkspaceResourceMetadatums(s.T(), db, database.WorkspaceResourceMetadatum{})
|
||||
check.Args(time.Now()).Asserts()
|
||||
}))
|
||||
s.Run("DeleteOldAgentStats", s.Subtest(func(db database.Store, check *expects) {
|
||||
s.Run("DeleteOldWorkspaceAgentStats", s.Subtest(func(db database.Store, check *expects) {
|
||||
check.Args().Asserts()
|
||||
}))
|
||||
s.Run("GetParameterSchemasCreatedAfter", s.Subtest(func(db database.Store, check *expects) {
|
||||
|
|
|
@ -40,7 +40,6 @@ func New() database.Store {
|
|||
mutex: &sync.RWMutex{},
|
||||
data: &data{
|
||||
apiKeys: make([]database.APIKey, 0),
|
||||
agentStats: make([]database.AgentStat, 0),
|
||||
organizationMembers: make([]database.OrganizationMember, 0),
|
||||
organizations: make([]database.Organization, 0),
|
||||
users: make([]database.User, 0),
|
||||
|
@ -60,6 +59,7 @@ func New() database.Store {
|
|||
provisionerJobs: make([]database.ProvisionerJob, 0),
|
||||
templateVersions: make([]database.TemplateVersion, 0),
|
||||
templates: make([]database.Template, 0),
|
||||
workspaceAgentStats: make([]database.WorkspaceAgentStat, 0),
|
||||
workspaceBuilds: make([]database.WorkspaceBuild, 0),
|
||||
workspaceApps: make([]database.WorkspaceApp, 0),
|
||||
workspaces: make([]database.Workspace, 0),
|
||||
|
@ -98,7 +98,7 @@ type data struct {
|
|||
userLinks []database.UserLink
|
||||
|
||||
// New tables
|
||||
agentStats []database.AgentStat
|
||||
workspaceAgentStats []database.WorkspaceAgentStat
|
||||
auditLogs []database.AuditLog
|
||||
files []database.File
|
||||
gitAuthLinks []database.GitAuthLink
|
||||
|
@ -258,29 +258,34 @@ func (q *fakeQuerier) AcquireProvisionerJob(_ context.Context, arg database.Acqu
|
|||
return database.ProvisionerJob{}, sql.ErrNoRows
|
||||
}
|
||||
|
||||
func (*fakeQuerier) DeleteOldAgentStats(_ context.Context) error {
|
||||
func (*fakeQuerier) DeleteOldWorkspaceAgentStats(_ context.Context) error {
|
||||
// no-op
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *fakeQuerier) InsertAgentStat(_ context.Context, p database.InsertAgentStatParams) (database.AgentStat, error) {
|
||||
func (q *fakeQuerier) InsertWorkspaceAgentStat(_ context.Context, p database.InsertWorkspaceAgentStatParams) (database.WorkspaceAgentStat, error) {
|
||||
if err := validateDatabaseType(p); err != nil {
|
||||
return database.AgentStat{}, err
|
||||
return database.WorkspaceAgentStat{}, err
|
||||
}
|
||||
|
||||
q.mutex.Lock()
|
||||
defer q.mutex.Unlock()
|
||||
|
||||
stat := database.AgentStat{
|
||||
ID: p.ID,
|
||||
CreatedAt: p.CreatedAt,
|
||||
WorkspaceID: p.WorkspaceID,
|
||||
AgentID: p.AgentID,
|
||||
UserID: p.UserID,
|
||||
Payload: p.Payload,
|
||||
TemplateID: p.TemplateID,
|
||||
stat := database.WorkspaceAgentStat{
|
||||
ID: p.ID,
|
||||
CreatedAt: p.CreatedAt,
|
||||
WorkspaceID: p.WorkspaceID,
|
||||
AgentID: p.AgentID,
|
||||
UserID: p.UserID,
|
||||
ConnectionsByProto: p.ConnectionsByProto,
|
||||
ConnectionCount: p.ConnectionCount,
|
||||
RxPackets: p.RxPackets,
|
||||
RxBytes: p.RxBytes,
|
||||
TxPackets: p.TxPackets,
|
||||
TxBytes: p.TxBytes,
|
||||
TemplateID: p.TemplateID,
|
||||
}
|
||||
q.agentStats = append(q.agentStats, stat)
|
||||
q.workspaceAgentStats = append(q.workspaceAgentStats, stat)
|
||||
return stat, nil
|
||||
}
|
||||
|
||||
|
@ -290,7 +295,7 @@ func (q *fakeQuerier) GetTemplateDAUs(_ context.Context, templateID uuid.UUID) (
|
|||
|
||||
seens := make(map[time.Time]map[uuid.UUID]struct{})
|
||||
|
||||
for _, as := range q.agentStats {
|
||||
for _, as := range q.workspaceAgentStats {
|
||||
if as.TemplateID != templateID {
|
||||
continue
|
||||
}
|
||||
|
@ -330,7 +335,7 @@ func (q *fakeQuerier) GetDeploymentDAUs(_ context.Context) ([]database.GetDeploy
|
|||
|
||||
seens := make(map[time.Time]map[uuid.UUID]struct{})
|
||||
|
||||
for _, as := range q.agentStats {
|
||||
for _, as := range q.workspaceAgentStats {
|
||||
date := as.CreatedAt.Truncate(time.Hour * 24)
|
||||
|
||||
dateEntry := seens[date]
|
||||
|
|
|
@ -123,16 +123,6 @@ CREATE TYPE workspace_transition AS ENUM (
|
|||
'delete'
|
||||
);
|
||||
|
||||
CREATE TABLE agent_stats (
|
||||
id uuid NOT NULL,
|
||||
created_at timestamp with time zone NOT NULL,
|
||||
user_id uuid NOT NULL,
|
||||
agent_id uuid NOT NULL,
|
||||
workspace_id uuid NOT NULL,
|
||||
template_id uuid NOT NULL,
|
||||
payload jsonb NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE api_keys (
|
||||
id text NOT NULL,
|
||||
hashed_secret bytea NOT NULL,
|
||||
|
@ -472,6 +462,21 @@ CREATE TABLE users (
|
|||
last_seen_at timestamp without time zone DEFAULT '0001-01-01 00:00:00'::timestamp without time zone NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE workspace_agent_stats (
|
||||
id uuid NOT NULL,
|
||||
created_at timestamp with time zone NOT NULL,
|
||||
user_id uuid NOT NULL,
|
||||
agent_id uuid NOT NULL,
|
||||
workspace_id uuid NOT NULL,
|
||||
template_id uuid NOT NULL,
|
||||
connections_by_proto jsonb DEFAULT '{}'::jsonb NOT NULL,
|
||||
connection_count integer DEFAULT 0 NOT NULL,
|
||||
rx_packets integer DEFAULT 0 NOT NULL,
|
||||
rx_bytes integer DEFAULT 0 NOT NULL,
|
||||
tx_packets integer DEFAULT 0 NOT NULL,
|
||||
tx_bytes integer DEFAULT 0 NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE workspace_agents (
|
||||
id uuid NOT NULL,
|
||||
created_at timestamp with time zone NOT NULL,
|
||||
|
@ -611,7 +616,7 @@ ALTER TABLE ONLY provisioner_job_logs ALTER COLUMN id SET DEFAULT nextval('provi
|
|||
|
||||
ALTER TABLE ONLY workspace_resource_metadata ALTER COLUMN id SET DEFAULT nextval('workspace_resource_metadata_id_seq'::regclass);
|
||||
|
||||
ALTER TABLE ONLY agent_stats
|
||||
ALTER TABLE ONLY workspace_agent_stats
|
||||
ADD CONSTRAINT agent_stats_pkey PRIMARY KEY (id);
|
||||
|
||||
ALTER TABLE ONLY api_keys
|
||||
|
@ -734,9 +739,9 @@ ALTER TABLE ONLY workspace_resources
|
|||
ALTER TABLE ONLY workspaces
|
||||
ADD CONSTRAINT workspaces_pkey PRIMARY KEY (id);
|
||||
|
||||
CREATE INDEX idx_agent_stats_created_at ON agent_stats USING btree (created_at);
|
||||
CREATE INDEX idx_agent_stats_created_at ON workspace_agent_stats USING btree (created_at);
|
||||
|
||||
CREATE INDEX idx_agent_stats_user_id ON agent_stats USING btree (user_id);
|
||||
CREATE INDEX idx_agent_stats_user_id ON workspace_agent_stats USING btree (user_id);
|
||||
|
||||
CREATE INDEX idx_api_keys_user ON api_keys USING btree (user_id);
|
||||
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
ALTER TABLE workspace_agent_stats RENAME TO agent_stats;
|
||||
|
||||
ALTER TABLE agent_stats ADD COLUMN payload jsonb NOT NULL DEFAULT '{}'::jsonb;
|
||||
ALTER TABLE agent_stats DROP COLUMN connections_by_proto,
|
||||
DROP COLUMN connection_count,
|
||||
DROP COLUMN rx_packets,
|
||||
DROP COLUMN rx_bytes,
|
||||
DROP COLUMN tx_packets,
|
||||
DROP COLUMN tx_bytes;
|
|
@ -0,0 +1,18 @@
|
|||
ALTER TABLE agent_stats RENAME TO workspace_agent_stats;
|
||||
|
||||
ALTER TABLE workspace_agent_stats ADD COLUMN connections_by_proto jsonb NOT NULL DEFAULT '{}'::jsonb;
|
||||
ALTER TABLE workspace_agent_stats ADD COLUMN connection_count integer DEFAULT 0 NOT NULL;
|
||||
ALTER TABLE workspace_agent_stats ADD COLUMN rx_packets integer DEFAULT 0 NOT NULL;
|
||||
ALTER TABLE workspace_agent_stats ADD COLUMN rx_bytes integer DEFAULT 0 NOT NULL;
|
||||
ALTER TABLE workspace_agent_stats ADD COLUMN tx_packets integer DEFAULT 0 NOT NULL;
|
||||
ALTER TABLE workspace_agent_stats ADD COLUMN tx_bytes integer DEFAULT 0 NOT NULL;
|
||||
|
||||
UPDATE workspace_agent_stats SET
|
||||
connections_by_proto = coalesce((payload ->> 'conns_by_proto')::jsonb, '{}'::jsonb),
|
||||
connection_count = coalesce((payload ->> 'num_conns')::integer, 0),
|
||||
rx_packets = coalesce((payload ->> 'rx_packets')::integer, 0),
|
||||
rx_bytes = coalesce((payload ->> 'rx_bytes')::integer, 0),
|
||||
tx_packets = coalesce((payload ->> 'tx_packets')::integer, 0),
|
||||
tx_bytes = coalesce((payload ->> 'tx_bytes')::integer, 0);
|
||||
|
||||
ALTER TABLE workspace_agent_stats DROP COLUMN payload;
|
|
@ -1218,16 +1218,6 @@ type APIKey struct {
|
|||
Scope APIKeyScope `db:"scope" json:"scope"`
|
||||
}
|
||||
|
||||
type AgentStat struct {
|
||||
ID uuid.UUID `db:"id" json:"id"`
|
||||
CreatedAt time.Time `db:"created_at" json:"created_at"`
|
||||
UserID uuid.UUID `db:"user_id" json:"user_id"`
|
||||
AgentID uuid.UUID `db:"agent_id" json:"agent_id"`
|
||||
WorkspaceID uuid.UUID `db:"workspace_id" json:"workspace_id"`
|
||||
TemplateID uuid.UUID `db:"template_id" json:"template_id"`
|
||||
Payload json.RawMessage `db:"payload" json:"payload"`
|
||||
}
|
||||
|
||||
type AuditLog struct {
|
||||
ID uuid.UUID `db:"id" json:"id"`
|
||||
Time time.Time `db:"time" json:"time"`
|
||||
|
@ -1558,6 +1548,21 @@ type WorkspaceAgent struct {
|
|||
ExpandedDirectory string `db:"expanded_directory" json:"expanded_directory"`
|
||||
}
|
||||
|
||||
type WorkspaceAgentStat struct {
|
||||
ID uuid.UUID `db:"id" json:"id"`
|
||||
CreatedAt time.Time `db:"created_at" json:"created_at"`
|
||||
UserID uuid.UUID `db:"user_id" json:"user_id"`
|
||||
AgentID uuid.UUID `db:"agent_id" json:"agent_id"`
|
||||
WorkspaceID uuid.UUID `db:"workspace_id" json:"workspace_id"`
|
||||
TemplateID uuid.UUID `db:"template_id" json:"template_id"`
|
||||
ConnectionsByProto json.RawMessage `db:"connections_by_proto" json:"connections_by_proto"`
|
||||
ConnectionCount int32 `db:"connection_count" json:"connection_count"`
|
||||
RxPackets int32 `db:"rx_packets" json:"rx_packets"`
|
||||
RxBytes int32 `db:"rx_bytes" json:"rx_bytes"`
|
||||
TxPackets int32 `db:"tx_packets" json:"tx_packets"`
|
||||
TxBytes int32 `db:"tx_bytes" json:"tx_bytes"`
|
||||
}
|
||||
|
||||
type WorkspaceApp struct {
|
||||
ID uuid.UUID `db:"id" json:"id"`
|
||||
CreatedAt time.Time `db:"created_at" json:"created_at"`
|
||||
|
|
|
@ -26,7 +26,7 @@ type sqlcQuerier interface {
|
|||
DeleteGroupMemberFromGroup(ctx context.Context, arg DeleteGroupMemberFromGroupParams) error
|
||||
DeleteGroupMembersByOrgAndUser(ctx context.Context, arg DeleteGroupMembersByOrgAndUserParams) error
|
||||
DeleteLicense(ctx context.Context, id int32) (int32, error)
|
||||
DeleteOldAgentStats(ctx context.Context) error
|
||||
DeleteOldWorkspaceAgentStats(ctx context.Context) error
|
||||
DeleteParameterValueByID(ctx context.Context, id uuid.UUID) error
|
||||
DeleteReplicasUpdatedBefore(ctx context.Context, updatedAt time.Time) error
|
||||
GetAPIKeyByID(ctx context.Context, id string) (APIKey, error)
|
||||
|
@ -133,7 +133,6 @@ type sqlcQuerier interface {
|
|||
GetWorkspaceResourcesCreatedAfter(ctx context.Context, createdAt time.Time) ([]WorkspaceResource, error)
|
||||
GetWorkspaces(ctx context.Context, arg GetWorkspacesParams) ([]GetWorkspacesRow, error)
|
||||
InsertAPIKey(ctx context.Context, arg InsertAPIKeyParams) (APIKey, error)
|
||||
InsertAgentStat(ctx context.Context, arg InsertAgentStatParams) (AgentStat, error)
|
||||
// We use the organization_id as the id
|
||||
// for simplicity since all users is
|
||||
// every member of the org.
|
||||
|
@ -168,6 +167,7 @@ type sqlcQuerier interface {
|
|||
InsertUserLink(ctx context.Context, arg InsertUserLinkParams) (UserLink, error)
|
||||
InsertWorkspace(ctx context.Context, arg InsertWorkspaceParams) (Workspace, error)
|
||||
InsertWorkspaceAgent(ctx context.Context, arg InsertWorkspaceAgentParams) (WorkspaceAgent, error)
|
||||
InsertWorkspaceAgentStat(ctx context.Context, arg InsertWorkspaceAgentStatParams) (WorkspaceAgentStat, error)
|
||||
InsertWorkspaceApp(ctx context.Context, arg InsertWorkspaceAppParams) (WorkspaceApp, error)
|
||||
InsertWorkspaceBuild(ctx context.Context, arg InsertWorkspaceBuildParams) (WorkspaceBuild, error)
|
||||
InsertWorkspaceBuildParameters(ctx context.Context, arg InsertWorkspaceBuildParametersParams) error
|
||||
|
|
|
@ -16,145 +16,6 @@ import (
|
|||
"github.com/tabbed/pqtype"
|
||||
)
|
||||
|
||||
const deleteOldAgentStats = `-- name: DeleteOldAgentStats :exec
|
||||
DELETE FROM agent_stats WHERE created_at < NOW() - INTERVAL '30 days'
|
||||
`
|
||||
|
||||
func (q *sqlQuerier) DeleteOldAgentStats(ctx context.Context) error {
|
||||
_, err := q.db.ExecContext(ctx, deleteOldAgentStats)
|
||||
return err
|
||||
}
|
||||
|
||||
const getDeploymentDAUs = `-- name: GetDeploymentDAUs :many
|
||||
SELECT
|
||||
(created_at at TIME ZONE 'UTC')::date as date,
|
||||
user_id
|
||||
FROM
|
||||
agent_stats
|
||||
GROUP BY
|
||||
date, user_id
|
||||
ORDER BY
|
||||
date ASC
|
||||
`
|
||||
|
||||
type GetDeploymentDAUsRow struct {
|
||||
Date time.Time `db:"date" json:"date"`
|
||||
UserID uuid.UUID `db:"user_id" json:"user_id"`
|
||||
}
|
||||
|
||||
func (q *sqlQuerier) GetDeploymentDAUs(ctx context.Context) ([]GetDeploymentDAUsRow, error) {
|
||||
rows, err := q.db.QueryContext(ctx, getDeploymentDAUs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []GetDeploymentDAUsRow
|
||||
for rows.Next() {
|
||||
var i GetDeploymentDAUsRow
|
||||
if err := rows.Scan(&i.Date, &i.UserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getTemplateDAUs = `-- name: GetTemplateDAUs :many
|
||||
SELECT
|
||||
(created_at at TIME ZONE 'UTC')::date as date,
|
||||
user_id
|
||||
FROM
|
||||
agent_stats
|
||||
WHERE
|
||||
template_id = $1
|
||||
GROUP BY
|
||||
date, user_id
|
||||
ORDER BY
|
||||
date ASC
|
||||
`
|
||||
|
||||
type GetTemplateDAUsRow struct {
|
||||
Date time.Time `db:"date" json:"date"`
|
||||
UserID uuid.UUID `db:"user_id" json:"user_id"`
|
||||
}
|
||||
|
||||
func (q *sqlQuerier) GetTemplateDAUs(ctx context.Context, templateID uuid.UUID) ([]GetTemplateDAUsRow, error) {
|
||||
rows, err := q.db.QueryContext(ctx, getTemplateDAUs, templateID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []GetTemplateDAUsRow
|
||||
for rows.Next() {
|
||||
var i GetTemplateDAUsRow
|
||||
if err := rows.Scan(&i.Date, &i.UserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const insertAgentStat = `-- name: InsertAgentStat :one
|
||||
INSERT INTO
|
||||
agent_stats (
|
||||
id,
|
||||
created_at,
|
||||
user_id,
|
||||
workspace_id,
|
||||
template_id,
|
||||
agent_id,
|
||||
payload
|
||||
)
|
||||
VALUES
|
||||
($1, $2, $3, $4, $5, $6, $7) RETURNING id, created_at, user_id, agent_id, workspace_id, template_id, payload
|
||||
`
|
||||
|
||||
type InsertAgentStatParams struct {
|
||||
ID uuid.UUID `db:"id" json:"id"`
|
||||
CreatedAt time.Time `db:"created_at" json:"created_at"`
|
||||
UserID uuid.UUID `db:"user_id" json:"user_id"`
|
||||
WorkspaceID uuid.UUID `db:"workspace_id" json:"workspace_id"`
|
||||
TemplateID uuid.UUID `db:"template_id" json:"template_id"`
|
||||
AgentID uuid.UUID `db:"agent_id" json:"agent_id"`
|
||||
Payload json.RawMessage `db:"payload" json:"payload"`
|
||||
}
|
||||
|
||||
func (q *sqlQuerier) InsertAgentStat(ctx context.Context, arg InsertAgentStatParams) (AgentStat, error) {
|
||||
row := q.db.QueryRowContext(ctx, insertAgentStat,
|
||||
arg.ID,
|
||||
arg.CreatedAt,
|
||||
arg.UserID,
|
||||
arg.WorkspaceID,
|
||||
arg.TemplateID,
|
||||
arg.AgentID,
|
||||
arg.Payload,
|
||||
)
|
||||
var i AgentStat
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.CreatedAt,
|
||||
&i.UserID,
|
||||
&i.AgentID,
|
||||
&i.WorkspaceID,
|
||||
&i.TemplateID,
|
||||
&i.Payload,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const deleteAPIKeyByID = `-- name: DeleteAPIKeyByID :exec
|
||||
DELETE
|
||||
FROM
|
||||
|
@ -5450,6 +5311,165 @@ func (q *sqlQuerier) UpdateWorkspaceAgentStartupByID(ctx context.Context, arg Up
|
|||
return err
|
||||
}
|
||||
|
||||
const deleteOldWorkspaceAgentStats = `-- name: DeleteOldWorkspaceAgentStats :exec
|
||||
DELETE FROM workspace_agent_stats WHERE created_at < NOW() - INTERVAL '30 days'
|
||||
`
|
||||
|
||||
func (q *sqlQuerier) DeleteOldWorkspaceAgentStats(ctx context.Context) error {
|
||||
_, err := q.db.ExecContext(ctx, deleteOldWorkspaceAgentStats)
|
||||
return err
|
||||
}
|
||||
|
||||
const getDeploymentDAUs = `-- name: GetDeploymentDAUs :many
|
||||
SELECT
|
||||
(created_at at TIME ZONE 'UTC')::date as date,
|
||||
user_id
|
||||
FROM
|
||||
workspace_agent_stats
|
||||
GROUP BY
|
||||
date, user_id
|
||||
ORDER BY
|
||||
date ASC
|
||||
`
|
||||
|
||||
type GetDeploymentDAUsRow struct {
|
||||
Date time.Time `db:"date" json:"date"`
|
||||
UserID uuid.UUID `db:"user_id" json:"user_id"`
|
||||
}
|
||||
|
||||
func (q *sqlQuerier) GetDeploymentDAUs(ctx context.Context) ([]GetDeploymentDAUsRow, error) {
|
||||
rows, err := q.db.QueryContext(ctx, getDeploymentDAUs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []GetDeploymentDAUsRow
|
||||
for rows.Next() {
|
||||
var i GetDeploymentDAUsRow
|
||||
if err := rows.Scan(&i.Date, &i.UserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getTemplateDAUs = `-- name: GetTemplateDAUs :many
|
||||
SELECT
|
||||
(created_at at TIME ZONE 'UTC')::date as date,
|
||||
user_id
|
||||
FROM
|
||||
workspace_agent_stats
|
||||
WHERE
|
||||
template_id = $1
|
||||
GROUP BY
|
||||
date, user_id
|
||||
ORDER BY
|
||||
date ASC
|
||||
`
|
||||
|
||||
type GetTemplateDAUsRow struct {
|
||||
Date time.Time `db:"date" json:"date"`
|
||||
UserID uuid.UUID `db:"user_id" json:"user_id"`
|
||||
}
|
||||
|
||||
func (q *sqlQuerier) GetTemplateDAUs(ctx context.Context, templateID uuid.UUID) ([]GetTemplateDAUsRow, error) {
|
||||
rows, err := q.db.QueryContext(ctx, getTemplateDAUs, templateID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []GetTemplateDAUsRow
|
||||
for rows.Next() {
|
||||
var i GetTemplateDAUsRow
|
||||
if err := rows.Scan(&i.Date, &i.UserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const insertWorkspaceAgentStat = `-- name: InsertWorkspaceAgentStat :one
|
||||
INSERT INTO
|
||||
workspace_agent_stats (
|
||||
id,
|
||||
created_at,
|
||||
user_id,
|
||||
workspace_id,
|
||||
template_id,
|
||||
agent_id,
|
||||
connections_by_proto,
|
||||
connection_count,
|
||||
rx_packets,
|
||||
rx_bytes,
|
||||
tx_packets,
|
||||
tx_bytes
|
||||
)
|
||||
VALUES
|
||||
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING id, created_at, user_id, agent_id, workspace_id, template_id, connections_by_proto, connection_count, rx_packets, rx_bytes, tx_packets, tx_bytes
|
||||
`
|
||||
|
||||
type InsertWorkspaceAgentStatParams struct {
|
||||
ID uuid.UUID `db:"id" json:"id"`
|
||||
CreatedAt time.Time `db:"created_at" json:"created_at"`
|
||||
UserID uuid.UUID `db:"user_id" json:"user_id"`
|
||||
WorkspaceID uuid.UUID `db:"workspace_id" json:"workspace_id"`
|
||||
TemplateID uuid.UUID `db:"template_id" json:"template_id"`
|
||||
AgentID uuid.UUID `db:"agent_id" json:"agent_id"`
|
||||
ConnectionsByProto json.RawMessage `db:"connections_by_proto" json:"connections_by_proto"`
|
||||
ConnectionCount int32 `db:"connection_count" json:"connection_count"`
|
||||
RxPackets int32 `db:"rx_packets" json:"rx_packets"`
|
||||
RxBytes int32 `db:"rx_bytes" json:"rx_bytes"`
|
||||
TxPackets int32 `db:"tx_packets" json:"tx_packets"`
|
||||
TxBytes int32 `db:"tx_bytes" json:"tx_bytes"`
|
||||
}
|
||||
|
||||
func (q *sqlQuerier) InsertWorkspaceAgentStat(ctx context.Context, arg InsertWorkspaceAgentStatParams) (WorkspaceAgentStat, error) {
|
||||
row := q.db.QueryRowContext(ctx, insertWorkspaceAgentStat,
|
||||
arg.ID,
|
||||
arg.CreatedAt,
|
||||
arg.UserID,
|
||||
arg.WorkspaceID,
|
||||
arg.TemplateID,
|
||||
arg.AgentID,
|
||||
arg.ConnectionsByProto,
|
||||
arg.ConnectionCount,
|
||||
arg.RxPackets,
|
||||
arg.RxBytes,
|
||||
arg.TxPackets,
|
||||
arg.TxBytes,
|
||||
)
|
||||
var i WorkspaceAgentStat
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.CreatedAt,
|
||||
&i.UserID,
|
||||
&i.AgentID,
|
||||
&i.WorkspaceID,
|
||||
&i.TemplateID,
|
||||
&i.ConnectionsByProto,
|
||||
&i.ConnectionCount,
|
||||
&i.RxPackets,
|
||||
&i.RxBytes,
|
||||
&i.TxPackets,
|
||||
&i.TxBytes,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const getWorkspaceAppByAgentIDAndSlug = `-- name: GetWorkspaceAppByAgentIDAndSlug :one
|
||||
SELECT id, created_at, agent_id, display_name, icon, command, url, healthcheck_url, healthcheck_interval, healthcheck_threshold, health, subdomain, sharing_level, slug, external FROM workspace_apps WHERE agent_id = $1 AND slug = $2
|
||||
`
|
||||
|
|
|
@ -1,23 +1,28 @@
|
|||
-- name: InsertAgentStat :one
|
||||
-- name: InsertWorkspaceAgentStat :one
|
||||
INSERT INTO
|
||||
agent_stats (
|
||||
workspace_agent_stats (
|
||||
id,
|
||||
created_at,
|
||||
user_id,
|
||||
workspace_id,
|
||||
template_id,
|
||||
agent_id,
|
||||
payload
|
||||
connections_by_proto,
|
||||
connection_count,
|
||||
rx_packets,
|
||||
rx_bytes,
|
||||
tx_packets,
|
||||
tx_bytes
|
||||
)
|
||||
VALUES
|
||||
($1, $2, $3, $4, $5, $6, $7) RETURNING *;
|
||||
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING *;
|
||||
|
||||
-- name: GetTemplateDAUs :many
|
||||
SELECT
|
||||
(created_at at TIME ZONE 'UTC')::date as date,
|
||||
user_id
|
||||
FROM
|
||||
agent_stats
|
||||
workspace_agent_stats
|
||||
WHERE
|
||||
template_id = $1
|
||||
GROUP BY
|
||||
|
@ -30,11 +35,11 @@ SELECT
|
|||
(created_at at TIME ZONE 'UTC')::date as date,
|
||||
user_id
|
||||
FROM
|
||||
agent_stats
|
||||
workspace_agent_stats
|
||||
GROUP BY
|
||||
date, user_id
|
||||
ORDER BY
|
||||
date ASC;
|
||||
|
||||
-- name: DeleteOldAgentStats :exec
|
||||
DELETE FROM agent_stats WHERE created_at < NOW() - INTERVAL '30 days';
|
||||
-- name: DeleteOldWorkspaceAgentStats :exec
|
||||
DELETE FROM workspace_agent_stats WHERE created_at < NOW() - INTERVAL '30 days';
|
|
@ -145,7 +145,7 @@ func countUniqueUsers(rows []database.GetTemplateDAUsRow) int {
|
|||
func (c *Cache) refresh(ctx context.Context) error {
|
||||
//nolint:gocritic // This is a system service.
|
||||
ctx = dbauthz.AsSystemRestricted(ctx)
|
||||
err := c.database.DeleteOldAgentStats(ctx)
|
||||
err := c.database.DeleteOldWorkspaceAgentStats(ctx)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("delete old stats: %w", err)
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ func TestCache_TemplateUsers(t *testing.T) {
|
|||
)
|
||||
|
||||
type args struct {
|
||||
rows []database.InsertAgentStatParams
|
||||
rows []database.InsertWorkspaceAgentStatParams
|
||||
}
|
||||
type want struct {
|
||||
entries []codersdk.DAUEntry
|
||||
|
@ -45,7 +45,7 @@ func TestCache_TemplateUsers(t *testing.T) {
|
|||
{"empty", args{}, want{nil, 0}},
|
||||
{
|
||||
"one hole", args{
|
||||
rows: []database.InsertAgentStatParams{
|
||||
rows: []database.InsertWorkspaceAgentStatParams{
|
||||
{
|
||||
CreatedAt: date(2022, 8, 27),
|
||||
UserID: zebra,
|
||||
|
@ -75,7 +75,7 @@ func TestCache_TemplateUsers(t *testing.T) {
|
|||
}, 1},
|
||||
},
|
||||
{"no holes", args{
|
||||
rows: []database.InsertAgentStatParams{
|
||||
rows: []database.InsertWorkspaceAgentStatParams{
|
||||
{
|
||||
CreatedAt: date(2022, 8, 27),
|
||||
UserID: zebra,
|
||||
|
@ -104,7 +104,7 @@ func TestCache_TemplateUsers(t *testing.T) {
|
|||
},
|
||||
}, 1}},
|
||||
{"holes", args{
|
||||
rows: []database.InsertAgentStatParams{
|
||||
rows: []database.InsertWorkspaceAgentStatParams{
|
||||
{
|
||||
CreatedAt: date(2022, 1, 1),
|
||||
UserID: zebra,
|
||||
|
@ -179,7 +179,7 @@ func TestCache_TemplateUsers(t *testing.T) {
|
|||
|
||||
for _, row := range tt.args.rows {
|
||||
row.TemplateID = template.ID
|
||||
db.InsertAgentStat(context.Background(), row)
|
||||
db.InsertWorkspaceAgentStat(context.Background(), row)
|
||||
}
|
||||
|
||||
require.Eventuallyf(t, func() bool {
|
||||
|
|
|
@ -933,21 +933,26 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques
|
|||
|
||||
activityBumpWorkspace(ctx, api.Logger.Named("activity_bump"), api.Database, workspace.ID)
|
||||
|
||||
payload, err := json.Marshal(req)
|
||||
payload, err := json.Marshal(req.ConnectionsByProto)
|
||||
if err != nil {
|
||||
api.Logger.Error(ctx, "marshal agent stats report", slog.Error(err))
|
||||
api.Logger.Error(ctx, "marshal agent connections by proto", slog.F("workspace_agent", workspaceAgent.ID), slog.Error(err))
|
||||
payload = json.RawMessage("{}")
|
||||
}
|
||||
|
||||
now := database.Now()
|
||||
_, err = api.Database.InsertAgentStat(ctx, database.InsertAgentStatParams{
|
||||
ID: uuid.New(),
|
||||
CreatedAt: now,
|
||||
AgentID: workspaceAgent.ID,
|
||||
WorkspaceID: workspace.ID,
|
||||
UserID: workspace.OwnerID,
|
||||
TemplateID: workspace.TemplateID,
|
||||
Payload: payload,
|
||||
_, err = api.Database.InsertWorkspaceAgentStat(ctx, database.InsertWorkspaceAgentStatParams{
|
||||
ID: uuid.New(),
|
||||
CreatedAt: now,
|
||||
AgentID: workspaceAgent.ID,
|
||||
WorkspaceID: workspace.ID,
|
||||
UserID: workspace.OwnerID,
|
||||
TemplateID: workspace.TemplateID,
|
||||
ConnectionsByProto: payload,
|
||||
ConnectionCount: int32(req.ConnectionCount),
|
||||
RxPackets: int32(req.RxPackets),
|
||||
RxBytes: int32(req.RxBytes),
|
||||
TxPackets: int32(req.TxPackets),
|
||||
TxBytes: int32(req.TxBytes),
|
||||
})
|
||||
if err != nil {
|
||||
httpapi.InternalServerError(rw, err)
|
||||
|
|
|
@ -1178,12 +1178,12 @@ func TestWorkspaceAgentReportStats(t *testing.T) {
|
|||
agentClient.SetSessionToken(authToken)
|
||||
|
||||
_, err := agentClient.PostStats(context.Background(), &agentsdk.Stats{
|
||||
ConnsByProto: map[string]int64{"TCP": 1},
|
||||
NumConns: 1,
|
||||
RxPackets: 1,
|
||||
RxBytes: 1,
|
||||
TxPackets: 1,
|
||||
TxBytes: 1,
|
||||
ConnectionsByProto: map[string]int64{"TCP": 1},
|
||||
ConnectionCount: 1,
|
||||
RxPackets: 1,
|
||||
RxBytes: 1,
|
||||
TxPackets: 1,
|
||||
TxBytes: 1,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
|
|
|
@ -397,7 +397,7 @@ func (c *Client) ReportStats(ctx context.Context, log slog.Logger, statsChan <-c
|
|||
}
|
||||
|
||||
// Send an empty stat to get the interval.
|
||||
postStat(&Stats{ConnsByProto: map[string]int64{}})
|
||||
postStat(&Stats{ConnectionsByProto: map[string]int64{}})
|
||||
|
||||
go func() {
|
||||
defer close(exited)
|
||||
|
@ -426,10 +426,10 @@ func (c *Client) ReportStats(ctx context.Context, log slog.Logger, statsChan <-c
|
|||
// Stats records the Agent's network connection statistics for use in
|
||||
// user-facing metrics and debugging.
|
||||
type Stats struct {
|
||||
// ConnsByProto is a count of connections by protocol.
|
||||
ConnsByProto map[string]int64 `json:"conns_by_proto"`
|
||||
// NumConns is the number of connections received by an agent.
|
||||
NumConns int64 `json:"num_comms"`
|
||||
// ConnectionsByProto is a count of connections by protocol.
|
||||
ConnectionsByProto map[string]int64 `json:"conns_by_proto"`
|
||||
// ConnectionCount is the number of connections received by an agent.
|
||||
ConnectionCount int64 `json:"num_comms"`
|
||||
// RxPackets is the number of received packets.
|
||||
RxPackets int64 `json:"rx_packets"`
|
||||
// RxBytes is the number of received bytes.
|
||||
|
|
|
@ -79,7 +79,7 @@ func TestAgentReportStats(t *testing.T) {
|
|||
chanLen := 3
|
||||
statCh := make(chan *agentsdk.Stats, chanLen)
|
||||
for i := 0; i < chanLen; i++ {
|
||||
statCh <- &agentsdk.Stats{ConnsByProto: map[string]int64{}}
|
||||
statCh <- &agentsdk.Stats{ConnectionsByProto: map[string]int64{}}
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
|
Loading…
Reference in New Issue