mirror of https://github.com/coder/coder.git
fix(coderd): subscribe to workspace when streaming agent logs to detect outdated build (#9729)
Fixes #9721
This commit is contained in:
parent
87d50f17a2
commit
530dd9d247
|
@ -570,11 +570,27 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) {
|
|||
lastSentLogID = logs[len(logs)-1].ID
|
||||
}
|
||||
|
||||
workspaceNotifyCh := make(chan struct{}, 1)
|
||||
notifyCh := make(chan struct{}, 1)
|
||||
// Allow us to immediately check if we missed any logs
|
||||
// between initial fetch and subscribe.
|
||||
notifyCh <- struct{}{}
|
||||
|
||||
// Subscribe to workspace to detect new builds.
|
||||
closeSubscribeWorkspace, err := api.Pubsub.Subscribe(codersdk.WorkspaceNotifyChannel(workspace.ID), func(_ context.Context, _ []byte) {
|
||||
select {
|
||||
case workspaceNotifyCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
||||
Message: "Failed to subscribe to workspace for log streaming.",
|
||||
Detail: err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
defer closeSubscribeWorkspace()
|
||||
// Subscribe early to prevent missing log events.
|
||||
closeSubscribe, err := api.Pubsub.Subscribe(agentsdk.LogsNotifyChannel(workspaceAgent.ID), func(_ context.Context, _ []byte) {
|
||||
// The message is not important, we're tracking lastSentLogID manually.
|
||||
|
@ -585,7 +601,7 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) {
|
|||
})
|
||||
if err != nil {
|
||||
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
||||
Message: "Failed to subscribe to logs.",
|
||||
Message: "Failed to subscribe to agent for log streaming.",
|
||||
Detail: err.Error(),
|
||||
})
|
||||
return
|
||||
|
@ -600,20 +616,33 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) {
|
|||
defer t.Stop()
|
||||
|
||||
go func() {
|
||||
defer close(bufferedLogs)
|
||||
defer func() {
|
||||
logger.Debug(ctx, "end log streaming loop")
|
||||
close(bufferedLogs)
|
||||
}()
|
||||
logger.Debug(ctx, "start log streaming loop", slog.F("last_sent_log_id", lastSentLogID))
|
||||
|
||||
keepGoing := true
|
||||
for keepGoing {
|
||||
var (
|
||||
debugTriggeredBy string
|
||||
onlyCheckLatestBuild bool
|
||||
)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
debugTriggeredBy = "timer"
|
||||
case <-workspaceNotifyCh:
|
||||
debugTriggeredBy = "workspace"
|
||||
onlyCheckLatestBuild = true
|
||||
case <-notifyCh:
|
||||
debugTriggeredBy = "log"
|
||||
t.Reset(recheckInterval)
|
||||
}
|
||||
|
||||
agents, err := api.Database.GetWorkspaceAgentsInLatestBuildByWorkspaceID(ctx, workspace.ID)
|
||||
if err != nil {
|
||||
if err != nil && !xerrors.Is(err, sql.ErrNoRows) {
|
||||
if xerrors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
|
@ -624,6 +653,20 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) {
|
|||
// checking once.
|
||||
keepGoing = slices.ContainsFunc(agents, func(agent database.WorkspaceAgent) bool { return agent.ID == workspaceAgent.ID })
|
||||
|
||||
logger.Debug(
|
||||
ctx,
|
||||
"checking for new logs",
|
||||
slog.F("triggered_by", debugTriggeredBy),
|
||||
slog.F("only_check_latest_build", onlyCheckLatestBuild),
|
||||
slog.F("keep_going", keepGoing),
|
||||
slog.F("last_sent_log_id", lastSentLogID),
|
||||
slog.F("workspace_has_agents", len(agents) > 0),
|
||||
)
|
||||
|
||||
if onlyCheckLatestBuild && keepGoing {
|
||||
continue
|
||||
}
|
||||
|
||||
logs, err := api.Database.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{
|
||||
AgentID: workspaceAgent.ID,
|
||||
CreatedAfter: lastSentLogID,
|
||||
|
|
|
@ -385,34 +385,14 @@ func TestWorkspaceAgentStartupLogs(t *testing.T) {
|
|||
_ = closer.Close()
|
||||
}()
|
||||
|
||||
first := make(chan struct{})
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
assert.Fail(t, "context done while waiting in goroutine")
|
||||
case <-logs:
|
||||
close(first)
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
require.FailNow(t, "context done while waiting for first log")
|
||||
case <-first:
|
||||
case <-logs:
|
||||
}
|
||||
|
||||
_ = coderdtest.CreateWorkspaceBuild(t, client, workspace, database.WorkspaceTransitionStart)
|
||||
|
||||
// Send a new log message to trigger a re-check.
|
||||
err = agentClient.PatchLogs(ctx, agentsdk.PatchLogs{
|
||||
Logs: []agentsdk.Log{
|
||||
{
|
||||
CreatedAt: dbtime.Now(),
|
||||
Output: "testing2",
|
||||
},
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
require.FailNow(t, "context done while waiting for logs close")
|
||||
|
|
|
@ -703,7 +703,7 @@ func (c *Client) WorkspaceAgentLogsAfter(ctx context.Context, agentID uuid.UUID,
|
|||
}
|
||||
return nil, nil, ReadBodyAsError(res)
|
||||
}
|
||||
logChunks := make(chan []WorkspaceAgentLog)
|
||||
logChunks := make(chan []WorkspaceAgentLog, 1)
|
||||
closed := make(chan struct{})
|
||||
ctx, wsNetConn := websocketNetConn(ctx, conn, websocket.MessageText)
|
||||
decoder := json.NewDecoder(wsNetConn)
|
||||
|
|
Loading…
Reference in New Issue