fix(coderd): allow `workspaceAgentLogs` follow to return on non-latest-build (#9382)

This commit is contained in:
Mathias Fredriksson 2023-08-28 22:46:42 +03:00 committed by GitHub
parent fea8813f13
commit 487bdc2e08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 109 additions and 1 deletions

View File

@ -23,6 +23,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"golang.org/x/mod/semver"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
@ -481,6 +482,15 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) {
return
}
workspace, err := api.Database.GetWorkspaceByAgentID(ctx, workspaceAgent.ID)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching workspace by agent id.",
Detail: err.Error(),
})
return
}
api.WebsocketWaitMutex.Lock()
api.WebsocketWaitGroup.Add(1)
api.WebsocketWaitMutex.Unlock()
@ -556,7 +566,8 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) {
go func() {
defer close(bufferedLogs)
for {
keepGoing := true
for keepGoing {
select {
case <-ctx.Done():
return
@ -565,6 +576,18 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) {
t.Reset(recheckInterval)
}
agents, err := api.Database.GetWorkspaceAgentsInLatestBuildByWorkspaceID(ctx, workspace.ID)
if err != nil {
if xerrors.Is(err, context.Canceled) {
return
}
logger.Warn(ctx, "failed to get workspace agents in latest build", slog.Error(err))
continue
}
// If the agent is no longer in the latest build, we can stop after
// checking once.
keepGoing = slices.ContainsFunc(agents, func(agent database.WorkspaceAgent) bool { return agent.ID == workspaceAgent.ID })
logs, err := api.Database.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{
AgentID: workspaceAgent.ID,
CreatedAfter: lastSentLogID,

View File

@ -242,6 +242,91 @@ func TestWorkspaceAgentStartupLogs(t *testing.T) {
require.Equal(t, "testing", logChunk[0].Output)
require.Equal(t, "testing2", logChunk[1].Output)
})
t.Run("Close logs on outdated build", func(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitMedium)
client := coderdtest.New(t, &coderdtest.Options{
IncludeProvisionerDaemon: true,
})
user := coderdtest.CreateFirstUser(t, client)
authToken := uuid.NewString()
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
Parse: echo.ParseComplete,
ProvisionPlan: echo.PlanComplete,
ProvisionApply: []*proto.Response{{
Type: &proto.Response_Apply{
Apply: &proto.ApplyComplete{
Resources: []*proto.Resource{{
Name: "example",
Type: "aws_instance",
Agents: []*proto.Agent{{
Id: uuid.NewString(),
Auth: &proto.Agent_Token{
Token: authToken,
},
}},
}},
},
},
}},
})
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)
workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID)
build := coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)
agentClient := agentsdk.New(client.URL)
agentClient.SetSessionToken(authToken)
err := agentClient.PatchLogs(ctx, agentsdk.PatchLogs{
Logs: []agentsdk.Log{
{
CreatedAt: database.Now(),
Output: "testing",
},
},
})
require.NoError(t, err)
logs, closer, err := client.WorkspaceAgentLogsAfter(ctx, build.Resources[0].Agents[0].ID, 0, true)
require.NoError(t, err)
defer func() {
_ = closer.Close()
}()
first := make(chan struct{})
go func() {
select {
case <-ctx.Done():
assert.Fail(t, "context done while waiting in goroutine")
case <-logs:
close(first)
}
}()
select {
case <-ctx.Done():
require.FailNow(t, "context done while waiting for first log")
case <-first:
}
_ = coderdtest.CreateWorkspaceBuild(t, client, workspace, database.WorkspaceTransitionStart)
// Send a new log message to trigger a re-check.
err = agentClient.PatchLogs(ctx, agentsdk.PatchLogs{
Logs: []agentsdk.Log{
{
CreatedAt: database.Now(),
Output: "testing2",
},
},
})
require.NoError(t, err)
select {
case <-ctx.Done():
require.FailNow(t, "context done while waiting for logs close")
case <-logs:
}
})
t.Run("PublishesOnOverflow", func(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitMedium)