From 92aa1eba97f3b0c90d0607496e9af5a172acb14f Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 20 Mar 2024 16:44:12 +0000 Subject: [PATCH] fix(cli): port-forward: update workspace last_used_at (#12659) This PR updates the coder port-forward command to periodically inform coderd that the workspace is being used: - Adds workspaceusage.Tracker which periodically batch-updates workspace LastUsedAt - Adds coderd endpoint to signal workspace usage - Updates coder port-forward to periodically hit this endpoint - Modifies BatchUpdateWorkspacesLastUsedAt to avoid overwriting with stale data Co-authored-by: Danny Kopping --- cli/portforward.go | 3 + cli/portforward_test.go | 29 ++- cli/server.go | 8 + coderd/apidoc/docs.go | 29 +++ coderd/apidoc/swagger.json | 27 +++ coderd/coderd.go | 15 ++ coderd/coderdtest/coderdtest.go | 34 ++++ coderd/database/dbmem/dbmem.go | 4 + coderd/database/queries.sql.go | 3 + coderd/database/queries/workspaces.sql | 5 +- coderd/workspaces.go | 18 ++ coderd/workspaceusage/tracker.go | 234 +++++++++++++++++++++++++ coderd/workspaceusage/tracker_test.go | 225 ++++++++++++++++++++++++ codersdk/workspaces.go | 50 ++++++ docs/api/workspaces.md | 26 +++ 15 files changed, 708 insertions(+), 2 deletions(-) create mode 100644 coderd/workspaceusage/tracker.go create mode 100644 coderd/workspaceusage/tracker_test.go diff --git a/cli/portforward.go b/cli/portforward.go index 68a076d590..ebe925a6a3 100644 --- a/cli/portforward.go +++ b/cli/portforward.go @@ -136,6 +136,8 @@ func (r *RootCmd) portForward() *serpent.Command { listeners[i] = l } + stopUpdating := client.UpdateWorkspaceUsageContext(ctx, workspace.ID) + // Wait for the context to be canceled or for a signal and close // all listeners. var closeErr error @@ -156,6 +158,7 @@ func (r *RootCmd) portForward() *serpent.Command { } cancel() + stopUpdating() closeAllListeners() }() diff --git a/cli/portforward_test.go b/cli/portforward_test.go index 9ea5335c43..edef520c23 100644 --- a/cli/portforward_test.go +++ b/cli/portforward_test.go @@ -21,6 +21,7 @@ import ( "github.com/coder/coder/v2/coderd/coderdtest" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbfake" + "github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/pty/ptytest" "github.com/coder/coder/v2/testutil" @@ -96,7 +97,12 @@ func TestPortForward(t *testing.T) { // Setup agent once to be shared between test-cases (avoid expensive // non-parallel setup). var ( - client, db = coderdtest.NewWithDatabase(t, nil) + wuTick = make(chan time.Time) + wuFlush = make(chan int, 1) + client, db = coderdtest.NewWithDatabase(t, &coderdtest.Options{ + WorkspaceUsageTrackerTick: wuTick, + WorkspaceUsageTrackerFlush: wuFlush, + }) admin = coderdtest.CreateFirstUser(t, client) member, memberUser = coderdtest.CreateAnotherUser(t, client, admin.OrganizationID) workspace = runAgent(t, client, memberUser.ID, db) @@ -148,6 +154,13 @@ func TestPortForward(t *testing.T) { cancel() err = <-errC require.ErrorIs(t, err, context.Canceled) + + flushCtx := testutil.Context(t, testutil.WaitShort) + testutil.RequireSendCtx(flushCtx, t, wuTick, dbtime.Now()) + _ = testutil.RequireRecvCtx(flushCtx, t, wuFlush) + updated, err := client.Workspace(context.Background(), workspace.ID) + require.NoError(t, err) + require.Greater(t, updated.LastUsedAt, workspace.LastUsedAt) }) t.Run(c.name+"_TwoPorts", func(t *testing.T) { @@ -196,6 +209,13 @@ func TestPortForward(t *testing.T) { cancel() err = <-errC require.ErrorIs(t, err, context.Canceled) + + flushCtx := testutil.Context(t, testutil.WaitShort) + testutil.RequireSendCtx(flushCtx, t, wuTick, dbtime.Now()) + _ = testutil.RequireRecvCtx(flushCtx, t, wuFlush) + updated, err := client.Workspace(context.Background(), workspace.ID) + require.NoError(t, err) + require.Greater(t, updated.LastUsedAt, workspace.LastUsedAt) }) } @@ -257,6 +277,13 @@ func TestPortForward(t *testing.T) { cancel() err := <-errC require.ErrorIs(t, err, context.Canceled) + + flushCtx := testutil.Context(t, testutil.WaitShort) + testutil.RequireSendCtx(flushCtx, t, wuTick, dbtime.Now()) + _ = testutil.RequireRecvCtx(flushCtx, t, wuFlush) + updated, err := client.Workspace(context.Background(), workspace.ID) + require.NoError(t, err) + require.Greater(t, updated.LastUsedAt, workspace.LastUsedAt) }) } diff --git a/cli/server.go b/cli/server.go index 94648bb900..f371c30156 100644 --- a/cli/server.go +++ b/cli/server.go @@ -86,6 +86,7 @@ import ( stringutil "github.com/coder/coder/v2/coderd/util/strings" "github.com/coder/coder/v2/coderd/workspaceapps" "github.com/coder/coder/v2/coderd/workspaceapps/appurl" + "github.com/coder/coder/v2/coderd/workspaceusage" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/drpc" "github.com/coder/coder/v2/cryptorand" @@ -968,6 +969,13 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd. purger := dbpurge.New(ctx, logger, options.Database) defer purger.Close() + // Updates workspace usage + tracker := workspaceusage.New(options.Database, + workspaceusage.WithLogger(logger.Named("workspace_usage_tracker")), + ) + options.WorkspaceUsageTracker = tracker + defer tracker.Close() + // Wrap the server in middleware that redirects to the access URL if // the request is not to a local IP. var handler http.Handler = coderAPI.RootHandler diff --git a/coderd/apidoc/docs.go b/coderd/apidoc/docs.go index 0b59ae2e46..22f113f732 100644 --- a/coderd/apidoc/docs.go +++ b/coderd/apidoc/docs.go @@ -7592,6 +7592,35 @@ const docTemplate = `{ } } }, + "/workspaces/{workspace}/usage": { + "post": { + "security": [ + { + "CoderSessionToken": [] + } + ], + "tags": [ + "Workspaces" + ], + "summary": "Post Workspace Usage by ID", + "operationId": "post-workspace-usage-by-id", + "parameters": [ + { + "type": "string", + "format": "uuid", + "description": "Workspace ID", + "name": "workspace", + "in": "path", + "required": true + } + ], + "responses": { + "204": { + "description": "No Content" + } + } + } + }, "/workspaces/{workspace}/watch": { "get": { "security": [ diff --git a/coderd/apidoc/swagger.json b/coderd/apidoc/swagger.json index 3255e8711c..f0f55d7f6b 100644 --- a/coderd/apidoc/swagger.json +++ b/coderd/apidoc/swagger.json @@ -6711,6 +6711,33 @@ } } }, + "/workspaces/{workspace}/usage": { + "post": { + "security": [ + { + "CoderSessionToken": [] + } + ], + "tags": ["Workspaces"], + "summary": "Post Workspace Usage by ID", + "operationId": "post-workspace-usage-by-id", + "parameters": [ + { + "type": "string", + "format": "uuid", + "description": "Workspace ID", + "name": "workspace", + "in": "path", + "required": true + } + ], + "responses": { + "204": { + "description": "No Content" + } + } + } + }, "/workspaces/{workspace}/watch": { "get": { "security": [ diff --git a/coderd/coderd.go b/coderd/coderd.go index bdd20512fa..6b89f4d095 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -66,6 +66,7 @@ import ( "github.com/coder/coder/v2/coderd/updatecheck" "github.com/coder/coder/v2/coderd/util/slice" "github.com/coder/coder/v2/coderd/workspaceapps" + "github.com/coder/coder/v2/coderd/workspaceusage" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/drpc" "github.com/coder/coder/v2/provisionerd/proto" @@ -190,6 +191,9 @@ type Options struct { // NewTicker is used for unit tests to replace "time.NewTicker". NewTicker func(duration time.Duration) (tick <-chan time.Time, done func()) + + // WorkspaceUsageTracker tracks workspace usage by the CLI. + WorkspaceUsageTracker *workspaceusage.Tracker } // @title Coder API @@ -362,6 +366,12 @@ func New(options *Options) *API { OIDC: options.OIDCConfig, } + if options.WorkspaceUsageTracker == nil { + options.WorkspaceUsageTracker = workspaceusage.New(options.Database, + workspaceusage.WithLogger(options.Logger.Named("workspace_usage_tracker")), + ) + } + ctx, cancel := context.WithCancel(context.Background()) r := chi.NewRouter() @@ -405,6 +415,7 @@ func New(options *Options) *API { options.Logger.Named("acquirer"), options.Database, options.Pubsub), + workspaceUsageTracker: options.WorkspaceUsageTracker, } api.AppearanceFetcher.Store(&appearance.DefaultFetcher) @@ -972,6 +983,7 @@ func New(options *Options) *API { }) r.Get("/watch", api.watchWorkspace) r.Put("/extend", api.putExtendWorkspace) + r.Post("/usage", api.postWorkspaceUsage) r.Put("/dormant", api.putWorkspaceDormant) r.Put("/favorite", api.putFavoriteWorkspace) r.Delete("/favorite", api.deleteFavoriteWorkspace) @@ -1179,6 +1191,8 @@ type API struct { statsBatcher *batchstats.Batcher Acquirer *provisionerdserver.Acquirer + + workspaceUsageTracker *workspaceusage.Tracker } // Close waits for all WebSocket connections to drain before returning. @@ -1200,6 +1214,7 @@ func (api *API) Close() error { _ = (*coordinator).Close() } _ = api.agentProvider.Close() + api.workspaceUsageTracker.Close() return nil } diff --git a/coderd/coderdtest/coderdtest.go b/coderd/coderdtest/coderdtest.go index 4d315c3e2b..303f840938 100644 --- a/coderd/coderdtest/coderdtest.go +++ b/coderd/coderdtest/coderdtest.go @@ -70,6 +70,7 @@ import ( "github.com/coder/coder/v2/coderd/util/ptr" "github.com/coder/coder/v2/coderd/workspaceapps" "github.com/coder/coder/v2/coderd/workspaceapps/appurl" + "github.com/coder/coder/v2/coderd/workspaceusage" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/codersdk/drpc" @@ -146,6 +147,8 @@ type Options struct { WorkspaceAppsStatsCollectorOptions workspaceapps.StatsCollectorOptions AllowWorkspaceRenames bool NewTicker func(duration time.Duration) (<-chan time.Time, func()) + WorkspaceUsageTrackerFlush chan int + WorkspaceUsageTrackerTick chan time.Time } // New constructs a codersdk client connected to an in-memory API instance. @@ -306,6 +309,36 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can hangDetector.Start() t.Cleanup(hangDetector.Close) + // Did last_used_at not update? Scratching your noggin? Here's why. + // Workspace usage tracking must be triggered manually in tests. + // The vast majority of existing tests do not depend on last_used_at + // and adding an extra time-based background goroutine to all existing + // tests may lead to future flakes and goleak complaints. + // Instead, pass in your own flush and ticker like so: + // + // tickCh = make(chan time.Time) + // flushCh = make(chan int, 1) + // client = coderdtest.New(t, &coderdtest.Options{ + // WorkspaceUsageTrackerFlush: flushCh, + // WorkspaceUsageTrackerTick: tickCh + // }) + // + // Now to trigger a tick, just write to `tickCh`. + // Reading from `flushCh` will ensure that workspaceusage.Tracker flushed. + // See TestPortForward or TestTracker_MultipleInstances for how this works in practice. + if options.WorkspaceUsageTrackerFlush == nil { + options.WorkspaceUsageTrackerFlush = make(chan int, 1) // buffering just in case + } + if options.WorkspaceUsageTrackerTick == nil { + options.WorkspaceUsageTrackerTick = make(chan time.Time, 1) // buffering just in case + } + // Close is called by API.Close() + wuTracker := workspaceusage.New( + options.Database, + workspaceusage.WithLogger(options.Logger.Named("workspace_usage_tracker")), + workspaceusage.WithTickFlush(options.WorkspaceUsageTrackerTick, options.WorkspaceUsageTrackerFlush), + ) + var mutex sync.RWMutex var handler http.Handler srv := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -454,6 +487,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can WorkspaceAppsStatsCollectorOptions: options.WorkspaceAppsStatsCollectorOptions, AllowWorkspaceRenames: options.AllowWorkspaceRenames, NewTicker: options.NewTicker, + WorkspaceUsageTracker: wuTracker, } } diff --git a/coderd/database/dbmem/dbmem.go b/coderd/database/dbmem/dbmem.go index 7ed8d8397c..ac53fa586f 100644 --- a/coderd/database/dbmem/dbmem.go +++ b/coderd/database/dbmem/dbmem.go @@ -1046,6 +1046,10 @@ func (q *FakeQuerier) BatchUpdateWorkspaceLastUsedAt(_ context.Context, arg data if _, found := m[q.workspaces[i].ID]; !found { continue } + // WHERE last_used_at < @last_used_at + if !q.workspaces[i].LastUsedAt.Before(arg.LastUsedAt) { + continue + } q.workspaces[i].LastUsedAt = arg.LastUsedAt n++ } diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index e31546da3a..057c10d267 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -11504,6 +11504,9 @@ SET last_used_at = $1 WHERE id = ANY($2 :: uuid[]) +AND + -- Do not overwrite with older data + last_used_at < $1 ` type BatchUpdateWorkspaceLastUsedAtParams struct { diff --git a/coderd/database/queries/workspaces.sql b/coderd/database/queries/workspaces.sql index 0482fd5135..01c86cb41e 100644 --- a/coderd/database/queries/workspaces.sql +++ b/coderd/database/queries/workspaces.sql @@ -433,7 +433,10 @@ UPDATE SET last_used_at = @last_used_at WHERE - id = ANY(@ids :: uuid[]); + id = ANY(@ids :: uuid[]) +AND + -- Do not overwrite with older data + last_used_at < @last_used_at; -- name: GetDeploymentWorkspaceStats :one WITH workspaces_with_jobs AS ( diff --git a/coderd/workspaces.go b/coderd/workspaces.go index b4f5ab8621..f29d44d6d7 100644 --- a/coderd/workspaces.go +++ b/coderd/workspaces.go @@ -1084,6 +1084,24 @@ func (api *API) putExtendWorkspace(rw http.ResponseWriter, r *http.Request) { httpapi.Write(ctx, rw, code, resp) } +// @Summary Post Workspace Usage by ID +// @ID post-workspace-usage-by-id +// @Security CoderSessionToken +// @Tags Workspaces +// @Param workspace path string true "Workspace ID" format(uuid) +// @Success 204 +// @Router /workspaces/{workspace}/usage [post] +func (api *API) postWorkspaceUsage(rw http.ResponseWriter, r *http.Request) { + workspace := httpmw.WorkspaceParam(r) + if !api.Authorize(r, rbac.ActionUpdate, workspace) { + httpapi.Forbidden(rw) + return + } + + api.workspaceUsageTracker.Add(workspace.ID) + rw.WriteHeader(http.StatusNoContent) +} + // @Summary Favorite workspace by ID. // @ID favorite-workspace-by-id // @Security CoderSessionToken diff --git a/coderd/workspaceusage/tracker.go b/coderd/workspaceusage/tracker.go new file mode 100644 index 0000000000..6a3659a500 --- /dev/null +++ b/coderd/workspaceusage/tracker.go @@ -0,0 +1,234 @@ +package workspaceusage + +import ( + "bytes" + "context" + "flag" + "os" + "sort" + "sync" + "time" + + "github.com/google/uuid" + + "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbauthz" + + "cdr.dev/slog" + "cdr.dev/slog/sloggers/sloghuman" +) + +var DefaultFlushInterval = 60 * time.Second + +// Store is a subset of database.Store +type Store interface { + BatchUpdateWorkspaceLastUsedAt(context.Context, database.BatchUpdateWorkspaceLastUsedAtParams) error +} + +// Tracker tracks and de-bounces updates to workspace usage activity. +// It keeps an internal map of workspace IDs that have been used and +// periodically flushes this to its configured Store. +type Tracker struct { + log slog.Logger // you know, for logs + flushLock sync.Mutex // protects m + flushErrors int // tracks the number of consecutive errors flushing + m *uuidSet // stores workspace ids + s Store // for flushing data + tickCh <-chan time.Time // controls flush interval + stopTick func() // stops flushing + stopCh chan struct{} // signals us to stop + stopOnce sync.Once // because you only stop once + doneCh chan struct{} // signifies that we have stopped + flushCh chan int // used for testing. +} + +// New returns a new Tracker. It is the caller's responsibility +// to call Close(). +func New(s Store, opts ...Option) *Tracker { + tr := &Tracker{ + log: slog.Make(sloghuman.Sink(os.Stderr)), + m: &uuidSet{}, + s: s, + tickCh: nil, + stopTick: nil, + stopCh: make(chan struct{}), + doneCh: make(chan struct{}), + flushCh: nil, + } + for _, opt := range opts { + opt(tr) + } + if tr.tickCh == nil && tr.stopTick == nil { + tick := time.NewTicker(DefaultFlushInterval) + tr.tickCh = tick.C + tr.stopTick = tick.Stop + } + go tr.loop() + return tr +} + +type Option func(*Tracker) + +// WithLogger sets the logger to be used by Tracker. +func WithLogger(log slog.Logger) Option { + return func(h *Tracker) { + h.log = log + } +} + +// WithFlushInterval allows configuring the flush interval of Tracker. +func WithFlushInterval(d time.Duration) Option { + return func(h *Tracker) { + ticker := time.NewTicker(d) + h.tickCh = ticker.C + h.stopTick = ticker.Stop + } +} + +// WithTickFlush allows passing two channels: one that reads +// a time.Time, and one that returns the number of marked workspaces +// every time Tracker flushes. +// For testing only and will panic if used outside of tests. +func WithTickFlush(tickCh <-chan time.Time, flushCh chan int) Option { + if flag.Lookup("test.v") == nil { + panic("developer error: WithTickFlush is not to be used outside of tests.") + } + return func(h *Tracker) { + h.tickCh = tickCh + h.stopTick = func() {} + h.flushCh = flushCh + } +} + +// Add marks the workspace with the given ID as having been used recently. +// Tracker will periodically flush this to its configured Store. +func (tr *Tracker) Add(workspaceID uuid.UUID) { + tr.m.Add(workspaceID) +} + +// flush updates last_used_at of all current workspace IDs. +// If this is held while a previous flush is in progress, it will +// deadlock until the previous flush has completed. +func (tr *Tracker) flush(now time.Time) { + // Copy our current set of IDs + ids := tr.m.UniqueAndClear() + count := len(ids) + if tr.flushCh != nil { // only used for testing + defer func() { + tr.flushCh <- count + }() + } + if count == 0 { + tr.log.Debug(context.Background(), "nothing to flush") + return + } + + // Set a short-ish timeout for this. We don't want to hang forever. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + // nolint: gocritic // system function + authCtx := dbauthz.AsSystemRestricted(ctx) + tr.flushLock.Lock() + defer tr.flushLock.Unlock() + if err := tr.s.BatchUpdateWorkspaceLastUsedAt(authCtx, database.BatchUpdateWorkspaceLastUsedAtParams{ + LastUsedAt: now, + IDs: ids, + }); err != nil { + // A single failure to flush is likely not a huge problem. If the workspace is still connected at + // the next iteration, either another coderd instance will likely have this data or the CLI + // will tell us again that the workspace is in use. + tr.flushErrors++ + if tr.flushErrors > 1 { + tr.log.Error(ctx, "multiple failures updating workspaces last_used_at", slog.F("count", count), slog.F("consecutive_errors", tr.flushErrors), slog.Error(err)) + // TODO: if this keeps failing, it indicates a fundamental problem with the database connection. + // How to surface it correctly to admins besides just screaming into the logs? + } else { + tr.log.Warn(ctx, "failed updating workspaces last_used_at", slog.F("count", count), slog.Error(err)) + } + return + } + tr.flushErrors = 0 + tr.log.Info(ctx, "updated workspaces last_used_at", slog.F("count", count), slog.F("now", now)) +} + +// loop periodically flushes every tick. +// If loop is called after Close, it will exit immediately and log an error. +func (tr *Tracker) loop() { + select { + case <-tr.doneCh: + tr.log.Error(context.Background(), "developer error: Loop called after Close") + return + default: + } + defer func() { + close(tr.doneCh) + tr.log.Debug(context.Background(), "workspace usage tracker loop exited") + }() + for { + select { + case <-tr.stopCh: + return + case now, ok := <-tr.tickCh: + if !ok { + return + } + // NOTE: we do not update last_used_at with the time at which each workspace was added. + // Instead, we update with the time of the flush. If the BatchUpdateWorkspacesLastUsedAt + // query can be rewritten to update each id with a corresponding last_used_at timestamp + // then we could capture the exact usage time of each workspace. For now however, as + // we perform this query at a regular interval, the time of the flush is 'close enough' + // for the purposes of both dormancy (and for autostop, in future). + tr.flush(now.UTC()) + } + } +} + +// Close stops Tracker and returns once Loop has exited. +// After calling Close(), Loop must not be called. +func (tr *Tracker) Close() error { + tr.stopOnce.Do(func() { + tr.stopCh <- struct{}{} + tr.stopTick() + <-tr.doneCh + }) + return nil +} + +// uuidSet is a set of UUIDs. Safe for concurrent usage. +// The zero value can be used. +type uuidSet struct { + l sync.Mutex + m map[uuid.UUID]struct{} +} + +func (s *uuidSet) Add(id uuid.UUID) { + s.l.Lock() + defer s.l.Unlock() + if s.m == nil { + s.m = make(map[uuid.UUID]struct{}) + } + s.m[id] = struct{}{} +} + +// UniqueAndClear returns the unique set of entries in s and +// resets the internal map. +func (s *uuidSet) UniqueAndClear() []uuid.UUID { + s.l.Lock() + defer s.l.Unlock() + if s.m == nil { + s.m = make(map[uuid.UUID]struct{}) + return []uuid.UUID{} + } + l := make([]uuid.UUID, 0) + for k := range s.m { + l = append(l, k) + } + // For ease of testing, sort the IDs lexically + sort.Slice(l, func(i, j int) bool { + // For some unfathomable reason, byte arrays are not comparable? + // See https://github.com/golang/go/issues/61004 + return bytes.Compare(l[i][:], l[j][:]) < 0 + }) + clear(s.m) + return l +} diff --git a/coderd/workspaceusage/tracker_test.go b/coderd/workspaceusage/tracker_test.go new file mode 100644 index 0000000000..ae9a9d2162 --- /dev/null +++ b/coderd/workspaceusage/tracker_test.go @@ -0,0 +1,225 @@ +package workspaceusage_test + +import ( + "bytes" + "sort" + "sync" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + "go.uber.org/mock/gomock" + + "cdr.dev/slog" + "cdr.dev/slog/sloggers/slogtest" + "github.com/coder/coder/v2/coderd/coderdtest" + "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbfake" + "github.com/coder/coder/v2/coderd/database/dbmock" + "github.com/coder/coder/v2/coderd/database/dbtestutil" + "github.com/coder/coder/v2/coderd/database/dbtime" + "github.com/coder/coder/v2/coderd/database/pubsub" + "github.com/coder/coder/v2/coderd/workspaceusage" + "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/testutil" +) + +func TestTracker(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + mDB := dbmock.NewMockStore(ctrl) + log := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + + tickCh := make(chan time.Time) + flushCh := make(chan int, 1) + wut := workspaceusage.New(mDB, + workspaceusage.WithLogger(log), + workspaceusage.WithTickFlush(tickCh, flushCh), + ) + defer wut.Close() + + // 1. No marked workspaces should imply no flush. + now := dbtime.Now() + tickCh <- now + count := <-flushCh + require.Equal(t, 0, count, "expected zero flushes") + + // 2. One marked workspace should cause a flush. + ids := []uuid.UUID{uuid.New()} + now = dbtime.Now() + wut.Add(ids[0]) + mDB.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), database.BatchUpdateWorkspaceLastUsedAtParams{ + LastUsedAt: now, + IDs: ids, + }).Times(1) + tickCh <- now + count = <-flushCh + require.Equal(t, 1, count, "expected one flush with one id") + + // 3. Lots of marked workspaces should also cause a flush. + for i := 0; i < 31; i++ { + ids = append(ids, uuid.New()) + } + + // Sort ids so mDB know what to expect. + sort.Slice(ids, func(i, j int) bool { + return bytes.Compare(ids[i][:], ids[j][:]) < 0 + }) + + now = dbtime.Now() + mDB.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), database.BatchUpdateWorkspaceLastUsedAtParams{ + LastUsedAt: now, + IDs: ids, + }) + for _, id := range ids { + wut.Add(id) + } + tickCh <- now + count = <-flushCh + require.Equal(t, len(ids), count, "incorrect number of ids flushed") + + // 4. Try to cause a race condition! + now = dbtime.Now() + // Difficult to know what to EXPECT here, so we won't check strictly here. + mDB.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), gomock.Any()).MinTimes(1).MaxTimes(len(ids)) + // Try to force a race condition. + var wg sync.WaitGroup + count = 0 + for i := 0; i < len(ids); i++ { + wg.Add(1) + go func() { + defer wg.Done() + tickCh <- now + }() + wut.Add(ids[i]) + } + + for i := 0; i < len(ids); i++ { + count += <-flushCh + } + + wg.Wait() + require.Equal(t, len(ids), count, "incorrect number of ids flushed") + + // 5. Closing multiple times should not be a problem. + wut.Close() + wut.Close() +} + +// This test performs a more 'integration-style' test with multiple instances. +func TestTracker_MultipleInstances(t *testing.T) { + t.Parallel() + if !dbtestutil.WillUsePostgres() { + t.Skip("this test only makes sense with postgres") + } + + // Given we have two coderd instances connected to the same database + var ( + ctx = testutil.Context(t, testutil.WaitLong) + db, _ = dbtestutil.NewDB(t) + // real pubsub is not safe for concurrent use, and this test currently + // does not depend on pubsub + ps = pubsub.NewInMemory() + wuTickA = make(chan time.Time) + wuFlushA = make(chan int, 1) + wuTickB = make(chan time.Time) + wuFlushB = make(chan int, 1) + clientA = coderdtest.New(t, &coderdtest.Options{ + WorkspaceUsageTrackerTick: wuTickA, + WorkspaceUsageTrackerFlush: wuFlushA, + Database: db, + Pubsub: ps, + }) + clientB = coderdtest.New(t, &coderdtest.Options{ + WorkspaceUsageTrackerTick: wuTickB, + WorkspaceUsageTrackerFlush: wuFlushB, + Database: db, + Pubsub: ps, + }) + owner = coderdtest.CreateFirstUser(t, clientA) + now = dbtime.Now() + ) + + clientB.SetSessionToken(clientA.SessionToken()) + + // Create a number of workspaces + numWorkspaces := 10 + w := make([]dbfake.WorkspaceResponse, numWorkspaces) + for i := 0; i < numWorkspaces; i++ { + wr := dbfake.WorkspaceBuild(t, db, database.Workspace{ + OwnerID: owner.UserID, + OrganizationID: owner.OrganizationID, + LastUsedAt: now, + }).WithAgent().Do() + w[i] = wr + } + + // Use client A to update LastUsedAt of the first three + require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[0].Workspace.ID)) + require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[1].Workspace.ID)) + require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[2].Workspace.ID)) + // Use client B to update LastUsedAt of the next three + require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[3].Workspace.ID)) + require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[4].Workspace.ID)) + require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[5].Workspace.ID)) + // The next two will have updated from both instances + require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[6].Workspace.ID)) + require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[6].Workspace.ID)) + require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[7].Workspace.ID)) + require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[7].Workspace.ID)) + // The last two will not report any usage. + + // Tick both with different times and wait for both flushes to complete + nowA := now.Add(time.Minute) + nowB := now.Add(2 * time.Minute) + var wg sync.WaitGroup + var flushedA, flushedB int + wg.Add(1) + go func() { + defer wg.Done() + wuTickA <- nowA + flushedA = <-wuFlushA + }() + wg.Add(1) + go func() { + defer wg.Done() + wuTickB <- nowB + flushedB = <-wuFlushB + }() + wg.Wait() + + // We expect 5 flushed IDs each + require.Equal(t, 5, flushedA) + require.Equal(t, 5, flushedB) + + // Fetch updated workspaces + updated := make([]codersdk.Workspace, numWorkspaces) + for i := 0; i < numWorkspaces; i++ { + ws, err := clientA.Workspace(ctx, w[i].Workspace.ID) + require.NoError(t, err) + updated[i] = ws + } + // We expect the first three to have the timestamp of flushA + require.Equal(t, nowA.UTC(), updated[0].LastUsedAt.UTC()) + require.Equal(t, nowA.UTC(), updated[1].LastUsedAt.UTC()) + require.Equal(t, nowA.UTC(), updated[2].LastUsedAt.UTC()) + // We expect the next three to have the timestamp of flushB + require.Equal(t, nowB.UTC(), updated[3].LastUsedAt.UTC()) + require.Equal(t, nowB.UTC(), updated[4].LastUsedAt.UTC()) + require.Equal(t, nowB.UTC(), updated[5].LastUsedAt.UTC()) + // The next two should have the timestamp of flushB as it is newer than flushA + require.Equal(t, nowB.UTC(), updated[6].LastUsedAt.UTC()) + require.Equal(t, nowB.UTC(), updated[7].LastUsedAt.UTC()) + // And the last two should be untouched + require.Equal(t, w[8].Workspace.LastUsedAt.UTC(), updated[8].LastUsedAt.UTC()) + require.Equal(t, w[8].Workspace.LastUsedAt.UTC(), updated[8].LastUsedAt.UTC()) + require.Equal(t, w[9].Workspace.LastUsedAt.UTC(), updated[9].LastUsedAt.UTC()) + require.Equal(t, w[9].Workspace.LastUsedAt.UTC(), updated[9].LastUsedAt.UTC()) +} + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/codersdk/workspaces.go b/codersdk/workspaces.go index ecdc99d777..0007e85de8 100644 --- a/codersdk/workspaces.go +++ b/codersdk/workspaces.go @@ -11,6 +11,8 @@ import ( "github.com/google/uuid" "golang.org/x/xerrors" + "cdr.dev/slog" + "github.com/coder/coder/v2/coderd/tracing" ) @@ -314,6 +316,54 @@ func (c *Client) PutExtendWorkspace(ctx context.Context, id uuid.UUID, req PutEx return nil } +// PostWorkspaceUsage marks the workspace as having been used recently. +func (c *Client) PostWorkspaceUsage(ctx context.Context, id uuid.UUID) error { + path := fmt.Sprintf("/api/v2/workspaces/%s/usage", id.String()) + res, err := c.Request(ctx, http.MethodPost, path, nil) + if err != nil { + return xerrors.Errorf("post workspace usage: %w", err) + } + defer res.Body.Close() + if res.StatusCode != http.StatusNoContent { + return ReadBodyAsError(res) + } + return nil +} + +// UpdateWorkspaceUsageContext periodically posts workspace usage for the workspace +// with the given id in the background. +// The caller is responsible for calling the returned function to stop the background +// process. +func (c *Client) UpdateWorkspaceUsageContext(ctx context.Context, id uuid.UUID) func() { + hbCtx, hbCancel := context.WithCancel(ctx) + // Perform one initial update + if err := c.PostWorkspaceUsage(hbCtx, id); err != nil { + c.logger.Warn(ctx, "failed to post workspace usage", slog.Error(err)) + } + ticker := time.NewTicker(time.Minute) + doneCh := make(chan struct{}) + go func() { + defer func() { + ticker.Stop() + close(doneCh) + }() + for { + select { + case <-ticker.C: + if err := c.PostWorkspaceUsage(hbCtx, id); err != nil { + c.logger.Warn(ctx, "failed to post workspace usage in background", slog.Error(err)) + } + case <-hbCtx.Done(): + return + } + } + }() + return func() { + hbCancel() + <-doneCh + } +} + // UpdateWorkspaceDormancy is a request to activate or make a workspace dormant. // A value of false will activate a dormant workspace. type UpdateWorkspaceDormancy struct { diff --git a/docs/api/workspaces.md b/docs/api/workspaces.md index f176653a17..c16dd970a5 100644 --- a/docs/api/workspaces.md +++ b/docs/api/workspaces.md @@ -1385,6 +1385,32 @@ curl -X PUT http://coder-server:8080/api/v2/workspaces/{workspace}/ttl \ To perform this operation, you must be authenticated. [Learn more](authentication.md). +## Post Workspace Usage by ID + +### Code samples + +```shell +# Example request using curl +curl -X POST http://coder-server:8080/api/v2/workspaces/{workspace}/usage \ + -H 'Coder-Session-Token: API_KEY' +``` + +`POST /workspaces/{workspace}/usage` + +### Parameters + +| Name | In | Type | Required | Description | +| ----------- | ---- | ------------ | -------- | ------------ | +| `workspace` | path | string(uuid) | true | Workspace ID | + +### Responses + +| Status | Meaning | Description | Schema | +| ------ | --------------------------------------------------------------- | ----------- | ------ | +| 204 | [No Content](https://tools.ietf.org/html/rfc7231#section-6.3.5) | No Content | | + +To perform this operation, you must be authenticated. [Learn more](authentication.md). + ## Watch workspace by ID ### Code samples