feat(support): fetch data concurrently (#12385)

Modifies pkg support to fetch data concurrently
This commit is contained in:
Cian Johnston 2024-03-05 17:41:42 +00:00 committed by GitHub
parent fb88fa8603
commit 5106d9fc47
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 130 additions and 70 deletions

View File

@ -6,6 +6,7 @@ import (
"net/http"
"strings"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"github.com/google/uuid"
@ -61,84 +62,114 @@ type Deps struct {
}
func DeploymentInfo(ctx context.Context, client *codersdk.Client, log slog.Logger) Deployment {
var d Deployment
// Note: each goroutine assigns to a different struct field, hence no mutex.
var (
d Deployment
eg errgroup.Group
)
bi, err := client.BuildInfo(ctx)
if err != nil {
log.Error(ctx, "fetch build info", slog.Error(err))
} else {
eg.Go(func() error {
bi, err := client.BuildInfo(ctx)
if err != nil {
return xerrors.Errorf("fetch build info: %w", err)
}
d.BuildInfo = &bi
}
return nil
})
dc, err := client.DeploymentConfig(ctx)
if err != nil {
log.Error(ctx, "fetch deployment config", slog.Error(err))
} else {
eg.Go(func() error {
dc, err := client.DeploymentConfig(ctx)
if err != nil {
return xerrors.Errorf("fetch deployment config: %w", err)
}
d.Config = dc
}
return nil
})
hr, err := client.DebugHealth(ctx)
if err != nil {
log.Error(ctx, "fetch health report", slog.Error(err))
} else {
eg.Go(func() error {
hr, err := client.DebugHealth(ctx)
if err != nil {
return xerrors.Errorf("fetch health report: %w", err)
}
d.HealthReport = &hr
}
return nil
})
exp, err := client.Experiments(ctx)
if err != nil {
log.Error(ctx, "fetch experiments", slog.Error(err))
} else {
eg.Go(func() error {
exp, err := client.Experiments(ctx)
if err != nil {
return xerrors.Errorf("fetch experiments: %w", err)
}
d.Experiments = exp
return nil
})
if err := eg.Wait(); err != nil {
log.Error(ctx, "fetch deployment information", slog.Error(err))
}
return d
}
func NetworkInfo(ctx context.Context, client *codersdk.Client, log slog.Logger, agentID uuid.UUID) Network {
var n Network
var (
n Network
eg errgroup.Group
)
coordResp, err := client.Request(ctx, http.MethodGet, "/api/v2/debug/coordinator", nil)
if err != nil {
log.Error(ctx, "fetch coordinator debug page", slog.Error(err))
} else {
eg.Go(func() error {
coordResp, err := client.Request(ctx, http.MethodGet, "/api/v2/debug/coordinator", nil)
if err != nil {
return xerrors.Errorf("fetch coordinator debug page: %w", err)
}
defer coordResp.Body.Close()
bs, err := io.ReadAll(coordResp.Body)
if err != nil {
log.Error(ctx, "read coordinator debug page", slog.Error(err))
} else {
n.CoordinatorDebug = string(bs)
return xerrors.Errorf("read coordinator debug page: %w", err)
}
}
n.CoordinatorDebug = string(bs)
return nil
})
tailResp, err := client.Request(ctx, http.MethodGet, "/api/v2/debug/tailnet", nil)
if err != nil {
log.Error(ctx, "fetch tailnet debug page", slog.Error(err))
} else {
eg.Go(func() error {
tailResp, err := client.Request(ctx, http.MethodGet, "/api/v2/debug/tailnet", nil)
if err != nil {
return xerrors.Errorf("fetch tailnet debug page: %w", err)
}
defer tailResp.Body.Close()
bs, err := io.ReadAll(tailResp.Body)
if err != nil {
log.Error(ctx, "read tailnet debug page", slog.Error(err))
} else {
n.TailnetDebug = string(bs)
return xerrors.Errorf("read tailnet debug page: %w", err)
}
}
n.TailnetDebug = string(bs)
return nil
})
if agentID != uuid.Nil {
eg.Go(func() error {
if agentID == uuid.Nil {
log.Warn(ctx, "agent id required for agent connection info")
return nil
}
connInfo, err := client.WorkspaceAgentConnectionInfo(ctx, agentID)
if err != nil {
log.Error(ctx, "fetch agent conn info", slog.Error(err), slog.F("agent_id", agentID.String()))
} else {
n.NetcheckLocal = &connInfo
return xerrors.Errorf("fetch agent conn info: %w", err)
}
} else {
log.Warn(ctx, "agent id required for agent connection info")
n.NetcheckLocal = &connInfo
return nil
})
if err := eg.Wait(); err != nil {
log.Error(ctx, "fetch network information", slog.Error(err))
}
return n
}
func WorkspaceInfo(ctx context.Context, client *codersdk.Client, log slog.Logger, workspaceID, agentID uuid.UUID) Workspace {
var w Workspace
var (
w Workspace
eg errgroup.Group
)
if workspaceID == uuid.Nil {
log.Error(ctx, "no workspace id specified")
@ -149,43 +180,57 @@ func WorkspaceInfo(ctx context.Context, client *codersdk.Client, log slog.Logger
log.Error(ctx, "no agent id specified")
}
// dependency, cannot fetch concurrently
ws, err := client.Workspace(ctx, workspaceID)
if err != nil {
log.Error(ctx, "fetch workspace", slog.Error(err), slog.F("workspace_id", workspaceID))
return w
}
agt, err := client.WorkspaceAgent(ctx, agentID)
if err != nil {
log.Error(ctx, "fetch workspace agent", slog.Error(err), slog.F("agent_id", agentID))
}
w.Workspace = ws
w.Agent = agt
buildLogCh, closer, err := client.WorkspaceBuildLogsAfter(ctx, ws.LatestBuild.ID, 0)
if err != nil {
log.Error(ctx, "fetch provisioner job logs", slog.Error(err), slog.F("job_id", ws.LatestBuild.Job.ID.String()))
} else {
eg.Go(func() error {
agt, err := client.WorkspaceAgent(ctx, agentID)
if err != nil {
return xerrors.Errorf("fetch workspace agent: %w", err)
}
w.Agent = agt
return nil
})
eg.Go(func() error {
buildLogCh, closer, err := client.WorkspaceBuildLogsAfter(ctx, ws.LatestBuild.ID, 0)
if err != nil {
return xerrors.Errorf("fetch provisioner job logs: %w", err)
}
defer closer.Close()
var logs []codersdk.ProvisionerJobLog
for log := range buildLogCh {
w.BuildLogs = append(w.BuildLogs, log)
logs = append(w.BuildLogs, log)
}
}
w.BuildLogs = logs
return nil
})
if len(w.Workspace.LatestBuild.Resources) == 0 {
log.Warn(ctx, "workspace build has no resources")
return w
}
agentLogCh, closer, err := client.WorkspaceAgentLogsAfter(ctx, agentID, 0, false)
if err != nil {
log.Error(ctx, "fetch agent startup logs", slog.Error(err), slog.F("agent_id", agentID.String()))
} else {
eg.Go(func() error {
if len(w.Workspace.LatestBuild.Resources) == 0 {
log.Warn(ctx, "workspace build has no resources")
return nil
}
agentLogCh, closer, err := client.WorkspaceAgentLogsAfter(ctx, agentID, 0, false)
if err != nil {
return xerrors.Errorf("fetch agent startup logs: %w", err)
}
defer closer.Close()
var logs []codersdk.WorkspaceAgentLog
for logChunk := range agentLogCh {
w.AgentStartupLogs = append(w.AgentStartupLogs, logChunk...)
logs = append(w.AgentStartupLogs, logChunk...)
}
w.AgentStartupLogs = logs
return nil
})
if err := eg.Wait(); err != nil {
log.Error(ctx, "fetch workspace information", slog.Error(err))
}
return w
@ -225,9 +270,24 @@ func Run(ctx context.Context, d *Deps) (*Bundle, error) {
}
}
b.Deployment = DeploymentInfo(ctx, d.Client, d.Log)
b.Workspace = WorkspaceInfo(ctx, d.Client, d.Log, d.WorkspaceID, d.AgentID)
b.Network = NetworkInfo(ctx, d.Client, d.Log, d.AgentID)
var eg errgroup.Group
eg.Go(func() error {
di := DeploymentInfo(ctx, d.Client, d.Log)
b.Deployment = di
return nil
})
eg.Go(func() error {
wi := WorkspaceInfo(ctx, d.Client, d.Log, d.WorkspaceID, d.AgentID)
b.Workspace = wi
return nil
})
eg.Go(func() error {
ni := NetworkInfo(ctx, d.Client, d.Log, d.AgentID)
b.Network = ni
return nil
})
_ = eg.Wait()
return &b, nil
}