fix(coderd): workspaceapps: update last_used_at when workspace app reports stats (#11603)

- Adds a new query BatchUpdateLastUsedAt
- Adds calls to BatchUpdateLastUsedAt in app stats handler upon flush
- Passes a stats flush channel to apptest setup scaffolding and updates unit tests to assert modifications to LastUsedAt.
This commit is contained in:
Cian Johnston 2024-01-16 14:06:39 +00:00 committed by GitHub
parent 5bfbf9f9e6
commit d583acad00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 186 additions and 15 deletions

View File

@ -695,6 +695,15 @@ func (q *querier) ArchiveUnusedTemplateVersions(ctx context.Context, arg databas
return q.db.ArchiveUnusedTemplateVersions(ctx, arg)
}
func (q *querier) BatchUpdateWorkspaceLastUsedAt(ctx context.Context, arg database.BatchUpdateWorkspaceLastUsedAtParams) error {
// Could be any workspace and checking auth to each workspace is overkill for the purpose
// of this function.
if err := q.authorizeContext(ctx, rbac.ActionUpdate, rbac.ResourceWorkspace.All()); err != nil {
return err
}
return q.db.BatchUpdateWorkspaceLastUsedAt(ctx, arg)
}
func (q *querier) CleanTailnetCoordinators(ctx context.Context) error {
if err := q.authorizeContext(ctx, rbac.ActionDelete, rbac.ResourceTailnetCoordinator); err != nil {
return err

View File

@ -1549,6 +1549,13 @@ func (s *MethodTestSuite) TestWorkspace() {
ID: ws.ID,
}).Asserts(ws, rbac.ActionUpdate).Returns()
}))
s.Run("BatchUpdateWorkspaceLastUsedAt", s.Subtest(func(db database.Store, check *expects) {
ws1 := dbgen.Workspace(s.T(), db, database.Workspace{})
ws2 := dbgen.Workspace(s.T(), db, database.Workspace{})
check.Args(database.BatchUpdateWorkspaceLastUsedAtParams{
IDs: []uuid.UUID{ws1.ID, ws2.ID},
}).Asserts(rbac.ResourceWorkspace.All(), rbac.ActionUpdate).Returns()
}))
s.Run("UpdateWorkspaceTTL", s.Subtest(func(db database.Store, check *expects) {
ws := dbgen.Workspace(s.T(), db, database.Workspace{})
check.Args(database.UpdateWorkspaceTTLParams{

View File

@ -963,6 +963,31 @@ func (q *FakeQuerier) ArchiveUnusedTemplateVersions(_ context.Context, arg datab
return archived, nil
}
func (q *FakeQuerier) BatchUpdateWorkspaceLastUsedAt(_ context.Context, arg database.BatchUpdateWorkspaceLastUsedAtParams) error {
err := validateDatabaseType(arg)
if err != nil {
return err
}
q.mutex.Lock()
defer q.mutex.Unlock()
// temporary map to avoid O(q.workspaces*arg.workspaceIds)
m := make(map[uuid.UUID]struct{})
for _, id := range arg.IDs {
m[id] = struct{}{}
}
n := 0
for i := 0; i < len(q.workspaces); i++ {
if _, found := m[q.workspaces[i].ID]; !found {
continue
}
q.workspaces[i].LastUsedAt = arg.LastUsedAt
n++
}
return nil
}
func (*FakeQuerier) CleanTailnetCoordinators(_ context.Context) error {
return ErrUnimplemented
}

View File

@ -114,6 +114,13 @@ func (m metricsStore) ArchiveUnusedTemplateVersions(ctx context.Context, arg dat
return r0, r1
}
func (m metricsStore) BatchUpdateWorkspaceLastUsedAt(ctx context.Context, arg database.BatchUpdateWorkspaceLastUsedAtParams) error {
start := time.Now()
r0 := m.s.BatchUpdateWorkspaceLastUsedAt(ctx, arg)
m.queryLatencies.WithLabelValues("BatchUpdateWorkspaceLastUsedAt").Observe(time.Since(start).Seconds())
return r0
}
func (m metricsStore) CleanTailnetCoordinators(ctx context.Context) error {
start := time.Now()
err := m.s.CleanTailnetCoordinators(ctx)

View File

@ -117,6 +117,20 @@ func (mr *MockStoreMockRecorder) ArchiveUnusedTemplateVersions(arg0, arg1 any) *
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ArchiveUnusedTemplateVersions", reflect.TypeOf((*MockStore)(nil).ArchiveUnusedTemplateVersions), arg0, arg1)
}
// BatchUpdateWorkspaceLastUsedAt mocks base method.
func (m *MockStore) BatchUpdateWorkspaceLastUsedAt(arg0 context.Context, arg1 database.BatchUpdateWorkspaceLastUsedAtParams) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "BatchUpdateWorkspaceLastUsedAt", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// BatchUpdateWorkspaceLastUsedAt indicates an expected call of BatchUpdateWorkspaceLastUsedAt.
func (mr *MockStoreMockRecorder) BatchUpdateWorkspaceLastUsedAt(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchUpdateWorkspaceLastUsedAt", reflect.TypeOf((*MockStore)(nil).BatchUpdateWorkspaceLastUsedAt), arg0, arg1)
}
// CleanTailnetCoordinators mocks base method.
func (m *MockStore) CleanTailnetCoordinators(arg0 context.Context) error {
m.ctrl.T.Helper()

View File

@ -42,6 +42,7 @@ type sqlcQuerier interface {
// Only unused template versions will be archived, which are any versions not
// referenced by the latest build of a workspace.
ArchiveUnusedTemplateVersions(ctx context.Context, arg ArchiveUnusedTemplateVersionsParams) ([]uuid.UUID, error)
BatchUpdateWorkspaceLastUsedAt(ctx context.Context, arg BatchUpdateWorkspaceLastUsedAtParams) error
CleanTailnetCoordinators(ctx context.Context) error
CleanTailnetLostPeers(ctx context.Context) error
CleanTailnetTunnels(ctx context.Context) error

View File

@ -10813,6 +10813,25 @@ func (q *sqlQuerier) InsertWorkspaceResourceMetadata(ctx context.Context, arg In
return items, nil
}
const batchUpdateWorkspaceLastUsedAt = `-- name: BatchUpdateWorkspaceLastUsedAt :exec
UPDATE
workspaces
SET
last_used_at = $1
WHERE
id = ANY($2 :: uuid[])
`
type BatchUpdateWorkspaceLastUsedAtParams struct {
LastUsedAt time.Time `db:"last_used_at" json:"last_used_at"`
IDs []uuid.UUID `db:"ids" json:"ids"`
}
func (q *sqlQuerier) BatchUpdateWorkspaceLastUsedAt(ctx context.Context, arg BatchUpdateWorkspaceLastUsedAtParams) error {
_, err := q.db.ExecContext(ctx, batchUpdateWorkspaceLastUsedAt, arg.LastUsedAt, pq.Array(arg.IDs))
return err
}
const getDeploymentWorkspaceStats = `-- name: GetDeploymentWorkspaceStats :one
WITH workspaces_with_jobs AS (
SELECT

View File

@ -357,6 +357,14 @@ SET
WHERE
id = $1;
-- name: BatchUpdateWorkspaceLastUsedAt :exec
UPDATE
workspaces
SET
last_used_at = @last_used_at
WHERE
id = ANY(@ids :: uuid[]);
-- name: GetDeploymentWorkspaceStats :one
WITH workspaces_with_jobs AS (
SELECT

View File

@ -4,8 +4,9 @@ import (
"context"
"time"
"cdr.dev/slog"
"nhooyr.io/websocket"
"cdr.dev/slog"
)
// Heartbeat loops to ping a WebSocket to keep it alive.

View File

@ -1626,7 +1626,7 @@ func TestWorkspaceAgentExternalAuthListen(t *testing.T) {
cancel()
// We expect only 1
// In a failed test, you will likely see 9, as the last one
// gets cancelled.
// gets canceled.
require.Equal(t, 1, validateCalls, "validate calls duplicated on same token")
})
}

View File

@ -64,6 +64,7 @@ func Run(t *testing.T, appHostIsPrimary bool, factory DeploymentFactory) {
// reconnecting-pty proxy server we want to test is mounted.
client := appDetails.AppClient(t)
testReconnectingPTY(ctx, t, client, appDetails.Agent.ID, "")
assertWorkspaceLastUsedAtUpdated(t, appDetails)
})
t.Run("SignedTokenQueryParameter", func(t *testing.T) {
@ -92,6 +93,7 @@ func Run(t *testing.T, appHostIsPrimary bool, factory DeploymentFactory) {
// Make an unauthenticated client.
unauthedAppClient := codersdk.New(appDetails.AppClient(t).URL)
testReconnectingPTY(ctx, t, unauthedAppClient, appDetails.Agent.ID, issueRes.SignedToken)
assertWorkspaceLastUsedAtUpdated(t, appDetails)
})
})
@ -117,6 +119,9 @@ func Run(t *testing.T, appHostIsPrimary bool, factory DeploymentFactory) {
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Contains(t, string(body), "Path-based applications are disabled")
// Even though path-based apps are disabled, the request should indicate
// that the workspace was used.
assertWorkspaceLastUsedAtNotUpdated(t, appDetails)
})
t.Run("LoginWithoutAuthOnPrimary", func(t *testing.T) {
@ -142,6 +147,7 @@ func Run(t *testing.T, appHostIsPrimary bool, factory DeploymentFactory) {
require.NoError(t, err)
require.True(t, loc.Query().Has("message"))
require.True(t, loc.Query().Has("redirect"))
assertWorkspaceLastUsedAtUpdated(t, appDetails)
})
t.Run("LoginWithoutAuthOnProxy", func(t *testing.T) {
@ -179,6 +185,7 @@ func Run(t *testing.T, appHostIsPrimary bool, factory DeploymentFactory) {
// request is getting stripped.
require.Equal(t, u.Path, redirectURI.Path+"/")
require.Equal(t, u.RawQuery, redirectURI.RawQuery)
assertWorkspaceLastUsedAtUpdated(t, appDetails)
})
t.Run("NoAccessShould404", func(t *testing.T) {
@ -195,6 +202,8 @@ func Run(t *testing.T, appHostIsPrimary bool, factory DeploymentFactory) {
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusNotFound, resp.StatusCode)
// TODO(cian): A blocked request should not count as workspace usage.
// assertWorkspaceLastUsedAtNotUpdated(t, appDetails.AppClient(t), appDetails)
})
t.Run("RedirectsWithSlash", func(t *testing.T) {
@ -209,6 +218,8 @@ func Run(t *testing.T, appHostIsPrimary bool, factory DeploymentFactory) {
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusTemporaryRedirect, resp.StatusCode)
// TODO(cian): The initial redirect should not count as workspace usage.
// assertWorkspaceLastUsedAtNotUpdated(t, appDetails.AppClient(t), appDetails)
})
t.Run("RedirectsWithQuery", func(t *testing.T) {
@ -226,6 +237,8 @@ func Run(t *testing.T, appHostIsPrimary bool, factory DeploymentFactory) {
loc, err := resp.Location()
require.NoError(t, err)
require.Equal(t, proxyTestAppQuery, loc.RawQuery)
// TODO(cian): The initial redirect should not count as workspace usage.
// assertWorkspaceLastUsedAtNotUpdated(t, appDetails.AppClient(t), appDetails)
})
t.Run("Proxies", func(t *testing.T) {
@ -267,6 +280,7 @@ func Run(t *testing.T, appHostIsPrimary bool, factory DeploymentFactory) {
require.NoError(t, err)
require.Equal(t, proxyTestAppBody, string(body))
require.Equal(t, http.StatusOK, resp.StatusCode)
assertWorkspaceLastUsedAtUpdated(t, appDetails)
})
t.Run("ProxiesHTTPS", func(t *testing.T) {
@ -312,6 +326,7 @@ func Run(t *testing.T, appHostIsPrimary bool, factory DeploymentFactory) {
require.NoError(t, err)
require.Equal(t, proxyTestAppBody, string(body))
require.Equal(t, http.StatusOK, resp.StatusCode)
assertWorkspaceLastUsedAtUpdated(t, appDetails)
})
t.Run("BlocksMe", func(t *testing.T) {
@ -331,6 +346,8 @@ func Run(t *testing.T, appHostIsPrimary bool, factory DeploymentFactory) {
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Contains(t, string(body), "must be accessed with the full username, not @me")
// TODO(cian): A blocked request should not count as workspace usage.
// assertWorkspaceLastUsedAtNotUpdated(t, appDetails.AppClient(t), appDetails)
})
t.Run("ForwardsIP", func(t *testing.T) {
@ -349,6 +366,7 @@ func Run(t *testing.T, appHostIsPrimary bool, factory DeploymentFactory) {
require.Equal(t, proxyTestAppBody, string(body))
require.Equal(t, http.StatusOK, resp.StatusCode)
require.Equal(t, "1.1.1.1,127.0.0.1", resp.Header.Get("X-Forwarded-For"))
assertWorkspaceLastUsedAtUpdated(t, appDetails)
})
t.Run("ProxyError", func(t *testing.T) {
@ -361,6 +379,9 @@ func Run(t *testing.T, appHostIsPrimary bool, factory DeploymentFactory) {
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusBadGateway, resp.StatusCode)
// An valid authenticated attempt to access a workspace app
// should count as usage regardless of success.
assertWorkspaceLastUsedAtUpdated(t, appDetails)
})
t.Run("NoProxyPort", func(t *testing.T) {
@ -375,6 +396,7 @@ func Run(t *testing.T, appHostIsPrimary bool, factory DeploymentFactory) {
// TODO(@deansheather): This should be 400. There's a todo in the
// resolve request code to fix this.
require.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assertWorkspaceLastUsedAtUpdated(t, appDetails)
})
})
@ -1430,16 +1452,12 @@ func Run(t *testing.T, appHostIsPrimary bool, factory DeploymentFactory) {
t.Run("ReportStats", func(t *testing.T) {
t.Parallel()
flush := make(chan chan<- struct{}, 1)
reporter := &fakeStatsReporter{}
appDetails := setupProxyTest(t, &DeploymentOptions{
StatsCollectorOptions: workspaceapps.StatsCollectorOptions{
Reporter: reporter,
ReportInterval: time.Hour,
RollupWindow: time.Minute,
Flush: flush,
},
})
@ -1457,10 +1475,7 @@ func Run(t *testing.T, appHostIsPrimary bool, factory DeploymentFactory) {
var stats []workspaceapps.StatsReport
require.Eventually(t, func() bool {
// Keep flushing until we get a non-empty stats report.
flushDone := make(chan struct{}, 1)
flush <- flushDone
<-flushDone
appDetails.FlushStats()
stats = reporter.stats()
return len(stats) > 0
}, testutil.WaitLong, testutil.IntervalFast, "stats not reported")
@ -1549,3 +1564,28 @@ func testReconnectingPTY(ctx context.Context, t *testing.T, client *codersdk.Cli
// Ensure the connection closes.
require.ErrorIs(t, tr.ReadUntil(ctx, nil), io.EOF)
}
// Accessing an app should update the workspace's LastUsedAt.
// NOTE: Despite our efforts with the flush channel, this is inherently racy.
func assertWorkspaceLastUsedAtUpdated(t testing.TB, details *Details) {
t.Helper()
// Wait for stats to fully flush.
require.Eventually(t, func() bool {
details.FlushStats()
ws, err := details.SDKClient.Workspace(context.Background(), details.Workspace.ID)
assert.NoError(t, err)
return ws.LastUsedAt.After(details.Workspace.LastUsedAt)
}, testutil.WaitShort, testutil.IntervalMedium, "workspace LastUsedAt not updated when it should have been")
}
// Except when it sometimes shouldn't (e.g. no access)
// NOTE: Despite our efforts with the flush channel, this is inherently racy.
func assertWorkspaceLastUsedAtNotUpdated(t testing.TB, details *Details) {
t.Helper()
details.FlushStats()
ws, err := details.SDKClient.Workspace(context.Background(), details.Workspace.ID)
require.NoError(t, err)
require.Equal(t, ws.LastUsedAt, details.Workspace.LastUsedAt, "workspace LastUsedAt updated when it should not have been")
}

View File

@ -71,6 +71,7 @@ type Deployment struct {
SDKClient *codersdk.Client
FirstUser codersdk.CreateFirstUserResponse
PathAppBaseURL *url.URL
FlushStats func()
}
// DeploymentFactory generates a deployment with an API client, a path base URL,

View File

@ -13,6 +13,7 @@ import (
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/util/slice"
)
const (
@ -117,11 +118,25 @@ func (r *StatsDBReporter) Report(ctx context.Context, stats []StatsReport) error
batch.Requests = batch.Requests[:0]
}
}
if len(batch.UserID) > 0 {
err := tx.InsertWorkspaceAppStats(ctx, batch)
if err != nil {
return err
}
if len(batch.UserID) == 0 {
return nil
}
if err := tx.InsertWorkspaceAppStats(ctx, batch); err != nil {
return err
}
// TODO: We currently measure workspace usage based on when we get stats from it.
// There are currently two paths for this:
// 1) From SSH -> workspace agent stats POSTed from agent
// 2) From workspace apps / rpty -> workspace app stats (from coderd / wsproxy)
// Ideally we would have a single code path for this.
uniqueIDs := slice.Unique(batch.WorkspaceID)
if err := tx.BatchUpdateWorkspaceLastUsedAt(ctx, database.BatchUpdateWorkspaceLastUsedAtParams{
IDs: uniqueIDs,
LastUsedAt: dbtime.Now(), // This isn't 100% accurate, but it's good enough.
}); err != nil {
return err
}
return nil
@ -234,6 +249,7 @@ func (sc *StatsCollector) Collect(report StatsReport) {
}
delete(sc.statsBySessionID, report.SessionID)
}
sc.opts.Logger.Debug(sc.ctx, "collected workspace app stats", slog.F("report", report))
}
// rollup performs stats rollup for sessions that fall within the

View File

@ -262,6 +262,13 @@ func TestWorkspaceApps(t *testing.T) {
opts.AppHost = ""
}
flushStatsCollectorCh := make(chan chan<- struct{}, 1)
opts.StatsCollectorOptions.Flush = flushStatsCollectorCh
flushStats := func() {
flushStatsCollectorDone := make(chan struct{}, 1)
flushStatsCollectorCh <- flushStatsCollectorDone
<-flushStatsCollectorDone
}
client := coderdtest.New(t, &coderdtest.Options{
DeploymentValues: deploymentValues,
AppHostname: opts.AppHost,
@ -285,6 +292,7 @@ func TestWorkspaceApps(t *testing.T) {
SDKClient: client,
FirstUser: user,
PathAppBaseURL: client.URL,
FlushStats: flushStats,
}
})
}

View File

@ -37,6 +37,9 @@ type ProxyOptions struct {
// ProxyURL is optional
ProxyURL *url.URL
// FlushStats is optional
FlushStats chan chan<- struct{}
}
// NewWorkspaceProxy will configure a wsproxy.Server with the given options.
@ -113,6 +116,9 @@ func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Clie
// Inherit collector options from coderd, but keep the wsproxy reporter.
statsCollectorOptions := coderdAPI.Options.WorkspaceAppsStatsCollectorOptions
statsCollectorOptions.Reporter = nil
if options.FlushStats != nil {
statsCollectorOptions.Flush = options.FlushStats
}
wssrv, err := wsproxy.New(ctx, &wsproxy.Options{
Logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug),

View File

@ -442,6 +442,13 @@ func TestWorkspaceProxyWorkspaceApps(t *testing.T) {
"*",
}
proxyStatsCollectorFlushCh := make(chan chan<- struct{}, 1)
flushStats := func() {
proxyStatsCollectorFlushDone := make(chan struct{}, 1)
proxyStatsCollectorFlushCh <- proxyStatsCollectorFlushDone
<-proxyStatsCollectorFlushDone
}
client, closer, api, user := coderdenttest.NewWithAPI(t, &coderdenttest.Options{
Options: &coderdtest.Options{
DeploymentValues: deploymentValues,
@ -476,6 +483,7 @@ func TestWorkspaceProxyWorkspaceApps(t *testing.T) {
Name: "best-proxy",
AppHostname: opts.AppHost,
DisablePathApps: opts.DisablePathApps,
FlushStats: proxyStatsCollectorFlushCh,
})
return &apptest.Deployment{
@ -483,6 +491,7 @@ func TestWorkspaceProxyWorkspaceApps(t *testing.T) {
SDKClient: client,
FirstUser: user,
PathAppBaseURL: proxyAPI.Options.AccessURL,
FlushStats: flushStats,
}
})
}