feat: Make workspace watching realtime instead of polling (#4922)

* feat: Make workspace watching realtime instead of polling

This was leading to performance issues on the frontend, where
the page should only be rendered if changes occur. While this
could be changed on the frontend, it was always the intention
to make this socket ~realtime anyways.

* Fix workspace tests waiting, erroring on workspace update, and add comments to workspace events
This commit is contained in:
Kyle Carberry 2022-11-07 07:25:18 -08:00 committed by GitHub
parent a5cc1970cf
commit 56b963a940
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 239 additions and 77 deletions

View File

@ -54,7 +54,7 @@ func activityBumpWorkspace(log slog.Logger, db database.Store, workspace databas
newDeadline := database.Now().Add(bumpAmount)
if err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
if _, err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
ID: build.ID,
UpdatedAt: database.Now(),
ProvisionerState: build.ProvisionerState,

View File

@ -2825,7 +2825,7 @@ func (q *fakeQuerier) UpdateWorkspaceLastUsedAt(_ context.Context, arg database.
return sql.ErrNoRows
}
func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.UpdateWorkspaceBuildByIDParams) error {
func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.UpdateWorkspaceBuildByIDParams) (database.WorkspaceBuild, error) {
q.mutex.Lock()
defer q.mutex.Unlock()
@ -2837,9 +2837,9 @@ func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.U
workspaceBuild.ProvisionerState = arg.ProvisionerState
workspaceBuild.Deadline = arg.Deadline
q.workspaceBuilds[index] = workspaceBuild
return nil
return workspaceBuild, nil
}
return sql.ErrNoRows
return database.WorkspaceBuild{}, sql.ErrNoRows
}
func (q *fakeQuerier) UpdateWorkspaceDeletedByID(_ context.Context, arg database.UpdateWorkspaceDeletedByIDParams) error {

View File

@ -187,7 +187,7 @@ type sqlcQuerier interface {
UpdateWorkspaceAgentVersionByID(ctx context.Context, arg UpdateWorkspaceAgentVersionByIDParams) error
UpdateWorkspaceAppHealthByID(ctx context.Context, arg UpdateWorkspaceAppHealthByIDParams) error
UpdateWorkspaceAutostart(ctx context.Context, arg UpdateWorkspaceAutostartParams) error
UpdateWorkspaceBuildByID(ctx context.Context, arg UpdateWorkspaceBuildByIDParams) error
UpdateWorkspaceBuildByID(ctx context.Context, arg UpdateWorkspaceBuildByIDParams) (WorkspaceBuild, error)
UpdateWorkspaceDeletedByID(ctx context.Context, arg UpdateWorkspaceDeletedByIDParams) error
UpdateWorkspaceLastUsedAt(ctx context.Context, arg UpdateWorkspaceLastUsedAtParams) error
UpdateWorkspaceTTL(ctx context.Context, arg UpdateWorkspaceTTLParams) error

View File

@ -5487,7 +5487,7 @@ func (q *sqlQuerier) InsertWorkspaceBuild(ctx context.Context, arg InsertWorkspa
return i, err
}
const updateWorkspaceBuildByID = `-- name: UpdateWorkspaceBuildByID :exec
const updateWorkspaceBuildByID = `-- name: UpdateWorkspaceBuildByID :one
UPDATE
workspace_builds
SET
@ -5495,7 +5495,7 @@ SET
provisioner_state = $3,
deadline = $4
WHERE
id = $1
id = $1 RETURNING id, created_at, updated_at, workspace_id, template_version_id, build_number, transition, initiator_id, provisioner_state, job_id, deadline, reason
`
type UpdateWorkspaceBuildByIDParams struct {
@ -5505,14 +5505,29 @@ type UpdateWorkspaceBuildByIDParams struct {
Deadline time.Time `db:"deadline" json:"deadline"`
}
func (q *sqlQuerier) UpdateWorkspaceBuildByID(ctx context.Context, arg UpdateWorkspaceBuildByIDParams) error {
_, err := q.db.ExecContext(ctx, updateWorkspaceBuildByID,
func (q *sqlQuerier) UpdateWorkspaceBuildByID(ctx context.Context, arg UpdateWorkspaceBuildByIDParams) (WorkspaceBuild, error) {
row := q.db.QueryRowContext(ctx, updateWorkspaceBuildByID,
arg.ID,
arg.UpdatedAt,
arg.ProvisionerState,
arg.Deadline,
)
return err
var i WorkspaceBuild
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.WorkspaceID,
&i.TemplateVersionID,
&i.BuildNumber,
&i.Transition,
&i.InitiatorID,
&i.ProvisionerState,
&i.JobID,
&i.Deadline,
&i.Reason,
)
return i, err
}
const getWorkspaceResourceByID = `-- name: GetWorkspaceResourceByID :one

View File

@ -124,7 +124,7 @@ INSERT INTO
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING *;
-- name: UpdateWorkspaceBuildByID :exec
-- name: UpdateWorkspaceBuildByID :one
UPDATE
workspace_builds
SET
@ -132,4 +132,4 @@ SET
provisioner_state = $3,
deadline = $4
WHERE
id = $1;
id = $1 RETURNING *;

View File

@ -228,14 +228,20 @@ func ServerSentEventSender(rw http.ResponseWriter, r *http.Request) (sendEvent f
buf := &bytes.Buffer{}
enc := json.NewEncoder(buf)
_, err := buf.WriteString(fmt.Sprintf("event: %s\ndata: ", sse.Type))
_, err := buf.WriteString(fmt.Sprintf("event: %s\n", sse.Type))
if err != nil {
return err
}
err = enc.Encode(sse.Data)
if err != nil {
return err
if sse.Data != nil {
_, err = buf.WriteString("data: ")
if err != nil {
return err
}
err = enc.Encode(sse.Data)
if err != nil {
return err
}
}
err = buf.WriteByte('\n')

View File

@ -223,6 +223,10 @@ func (server *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty
if err != nil {
return nil, failJob(fmt.Sprintf("get owner: %s", err))
}
err = server.Pubsub.Publish(watchWorkspaceChannel(workspace.ID), []byte{})
if err != nil {
return nil, failJob(fmt.Sprintf("publish workspace update: %s", err))
}
// Compute parameters for the workspace to consume.
parameters, err := parameter.Compute(ctx, server.Database, parameter.ComputeScope{
@ -547,7 +551,7 @@ func (server *provisionerdServer) FailJob(ctx context.Context, failJob *proto.Fa
if err != nil {
return nil, xerrors.Errorf("unmarshal workspace provision input: %w", err)
}
err = server.Database.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
build, err := server.Database.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
ID: input.WorkspaceBuildID,
UpdatedAt: database.Now(),
ProvisionerState: jobType.WorkspaceBuild.State,
@ -556,6 +560,10 @@ func (server *provisionerdServer) FailJob(ctx context.Context, failJob *proto.Fa
if err != nil {
return nil, xerrors.Errorf("update workspace build state: %w", err)
}
err = server.Pubsub.Publish(watchWorkspaceChannel(build.WorkspaceID), []byte{})
if err != nil {
return nil, xerrors.Errorf("update workspace: %w", err)
}
case *proto.FailedJob_TemplateImport_:
}
@ -661,7 +669,7 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr
if err != nil {
return xerrors.Errorf("update provisioner job: %w", err)
}
err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
_, err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
ID: workspaceBuild.ID,
Deadline: workspaceDeadline,
ProvisionerState: jobType.WorkspaceBuild.State,
@ -696,6 +704,11 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr
if err != nil {
return nil, xerrors.Errorf("complete job: %w", err)
}
err = server.Pubsub.Publish(watchWorkspaceChannel(workspaceBuild.WorkspaceID), []byte{})
if err != nil {
return nil, xerrors.Errorf("update workspace: %w", err)
}
case *proto.CompletedJob_TemplateDryRun_:
for _, resource := range jobType.TemplateDryRun.Resources {
server.Logger.Info(ctx, "inserting template dry-run job resource",

View File

@ -539,6 +539,7 @@ func (api *API) workspaceAgentCoordinate(rw http.ResponseWriter, r *http.Request
Valid: true,
}
_ = updateConnectionTimes()
_ = api.Pubsub.Publish(watchWorkspaceChannel(build.WorkspaceID), []byte{})
}()
err = updateConnectionTimes()
@ -546,6 +547,7 @@ func (api *API) workspaceAgentCoordinate(rw http.ResponseWriter, r *http.Request
_ = conn.Close(websocket.StatusGoingAway, err.Error())
return
}
api.publishWorkspaceUpdate(ctx, build.WorkspaceID)
// End span so we don't get long lived trace data.
tracing.EndHTTPSpan(r, http.StatusOK, trace.SpanFromContext(ctx))
@ -972,6 +974,32 @@ func (api *API) postWorkspaceAppHealth(rw http.ResponseWriter, r *http.Request)
}
}
resource, err := api.Database.GetWorkspaceResourceByID(r.Context(), workspaceAgent.ResourceID)
if err != nil {
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching workspace resource.",
Detail: err.Error(),
})
return
}
job, err := api.Database.GetWorkspaceBuildByJobID(r.Context(), resource.JobID)
if err != nil {
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching workspace build.",
Detail: err.Error(),
})
return
}
workspace, err := api.Database.GetWorkspaceByID(r.Context(), job.WorkspaceID)
if err != nil {
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching workspace.",
Detail: err.Error(),
})
return
}
api.publishWorkspaceUpdate(r.Context(), workspace.ID)
httpapi.Write(r.Context(), rw, http.StatusOK, nil)
}

View File

@ -574,6 +574,8 @@ func (api *API) postWorkspaceBuilds(rw http.ResponseWriter, r *http.Request) {
return
}
api.publishWorkspaceUpdate(ctx, workspace.ID)
httpapi.Write(ctx, rw, http.StatusCreated, apiBuild)
}
@ -632,6 +634,9 @@ func (api *API) patchCancelWorkspaceBuild(rw http.ResponseWriter, r *http.Reques
})
return
}
api.publishWorkspaceUpdate(ctx, workspace.ID)
httpapi.Write(ctx, rw, http.StatusOK, codersdk.Response{
Message: "Job has been marked as canceled...",
})

View File

@ -634,6 +634,8 @@ func (api *API) patchWorkspace(rw http.ResponseWriter, r *http.Request) {
return
}
api.publishWorkspaceUpdate(ctx, workspace.ID)
aReq.New = newWorkspace
rw.WriteHeader(http.StatusNoContent)
}
@ -839,7 +841,7 @@ func (api *API) putExtendWorkspace(rw http.ResponseWriter, r *http.Request) {
return err
}
if err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
if _, err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
ID: build.ID,
UpdatedAt: build.UpdatedAt,
ProvisionerState: build.ProvisionerState,
@ -883,48 +885,65 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
// Ignore all trace spans after this, they're not too useful.
ctx = trace.ContextWithSpan(ctx, tracing.NoopSpan)
t := time.NewTicker(time.Second * 1)
defer t.Stop()
cancelSubscribe, err := api.Pubsub.Subscribe(watchWorkspaceChannel(workspace.ID), func(_ context.Context, _ []byte) {
workspace, err := api.Database.GetWorkspaceByID(ctx, workspace.ID)
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error fetching workspace.",
Detail: err.Error(),
},
})
return
}
data, err := api.workspaceData(ctx, []database.Workspace{workspace})
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error fetching workspace data.",
Detail: err.Error(),
},
})
return
}
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeData,
Data: convertWorkspace(
workspace,
data.builds[0],
data.templates[0],
findUser(workspace.OwnerID, data.users),
),
})
})
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error subscribing to workspace events.",
Detail: err.Error(),
},
})
return
}
defer cancelSubscribe()
// An initial ping signals to the request that the server is now ready
// and the client can begin servicing a channel with data.
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypePing,
})
for {
select {
case <-ctx.Done():
return
case <-senderClosed:
return
case <-t.C:
workspace, err := api.Database.GetWorkspaceByID(ctx, workspace.ID)
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error fetching workspace.",
Detail: err.Error(),
},
})
return
}
data, err := api.workspaceData(ctx, []database.Workspace{workspace})
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error fetching workspace data.",
Detail: err.Error(),
},
})
return
}
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeData,
Data: convertWorkspace(
workspace,
data.builds[0],
data.templates[0],
findUser(workspace.OwnerID, data.users),
),
})
}
}
}
@ -1213,3 +1232,15 @@ func splitQueryParameterByDelimiter(query string, delimiter rune, maintainQuotes
return parts
}
func watchWorkspaceChannel(id uuid.UUID) string {
return fmt.Sprintf("workspace:%s", id)
}
func (api *API) publishWorkspaceUpdate(ctx context.Context, workspaceID uuid.UUID) {
err := api.Pubsub.Publish(watchWorkspaceChannel(workspaceID), []byte{})
if err != nil {
api.Logger.Warn(ctx, "failed to publish workspace update",
slog.F("workspace_id", workspaceID), slog.Error(err))
}
}

View File

@ -12,6 +12,10 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/agent"
"github.com/coder/coder/coderd/audit"
"github.com/coder/coder/coderd/autobuild/schedule"
"github.com/coder/coder/coderd/coderdtest"
@ -1347,27 +1351,86 @@ func TestWorkspaceExtend(t *testing.T) {
func TestWorkspaceWatcher(t *testing.T) {
t.Parallel()
client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerDaemon: true})
client, closeFunc := coderdtest.NewWithProvisionerCloser(t, &coderdtest.Options{IncludeProvisionerDaemon: true})
user := coderdtest.CreateFirstUser(t, client)
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, nil)
authToken := uuid.NewString()
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
Parse: echo.ParseComplete,
ProvisionDryRun: echo.ProvisionComplete,
Provision: []*proto.Provision_Response{{
Type: &proto.Provision_Response_Complete{
Complete: &proto.Provision_Complete{
Resources: []*proto.Resource{{
Name: "example",
Type: "aws_instance",
Agents: []*proto.Agent{{
Id: uuid.NewString(),
Auth: &proto.Agent_Token{
Token: authToken,
},
}},
}},
},
},
}},
})
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID)
coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
w, err := client.Workspace(ctx, workspace.ID)
wc, err := client.WatchWorkspace(ctx, workspace.ID)
require.NoError(t, err)
wc, err := client.WatchWorkspace(ctx, w.ID)
require.NoError(t, err)
for i := 0; i < 3; i++ {
_, more := <-wc
require.True(t, more)
wait := func() {
select {
case <-ctx.Done():
t.Fail()
case <-wc:
}
}
coderdtest.CreateWorkspaceBuild(t, client, workspace, database.WorkspaceTransitionStart)
// the workspace build being created
wait()
// the workspace build being acquired
wait()
// the workspace build completing
wait()
agentClient := codersdk.New(client.URL)
agentClient.SessionToken = authToken
agentCloser := agent.New(agent.Options{
Client: agentClient,
Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug),
})
defer func() {
_ = agentCloser.Close()
}()
// the agent connected
wait()
agentCloser.Close()
// the agent disconnected
wait()
closeFunc.Close()
build := coderdtest.CreateWorkspaceBuild(t, client, workspace, database.WorkspaceTransitionStart)
// First is for the workspace build itself
wait()
err = client.CancelWorkspaceBuild(ctx, build.ID)
require.NoError(t, err)
// Second is for the build cancel
wait()
err = client.UpdateWorkspace(ctx, workspace.ID, codersdk.UpdateWorkspaceRequest{
Name: "another",
})
require.NoError(t, err)
wait()
cancel()
require.EqualValues(t, codersdk.Workspace{}, <-wc)
}
func mustLocation(t *testing.T, location string) *time.Location {

View File

@ -161,18 +161,19 @@ func (c *Client) WatchWorkspace(ctx context.Context, id uuid.UUID) (<-chan Works
if err != nil {
return
}
if sse.Type == ServerSentEventTypeData {
var ws Workspace
b, ok := sse.Data.([]byte)
if !ok {
return
}
err = json.Unmarshal(b, &ws)
if err != nil {
return
}
wc <- ws
if sse.Type != ServerSentEventTypeData {
continue
}
var ws Workspace
b, ok := sse.Data.([]byte)
if !ok {
return
}
err = json.Unmarshal(b, &ws)
if err != nil {
return
}
wc <- ws
}
}
}()