diff --git a/agent/agent.go b/agent/agent.go index b1bd45d50d..fe81786b14 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -90,7 +90,6 @@ type Options struct { type Client interface { ConnectRPC(ctx context.Context) (drpc.Conn, error) - PostMetadata(ctx context.Context, req agentsdk.PostMetadataRequest) error RewriteDERPMap(derpMap *tailcfg.DERPMap) } @@ -298,7 +297,6 @@ func (a *agent) init() { // may be happening, but regardless after the intermittent // failure, you'll want the agent to reconnect. func (a *agent) runLoop() { - go a.reportMetadataUntilGracefulShutdown() go a.manageProcessPriorityUntilGracefulShutdown() // need to keep retrying up to the hardCtx so that we can send graceful shutdown-related @@ -405,9 +403,7 @@ func (t *trySingleflight) Do(key string, fn func()) { fn() } -func (a *agent) reportMetadataUntilGracefulShutdown() { - // metadata reporting can cease as soon as we start gracefully shutting down. - ctx := a.gracefulCtx +func (a *agent) reportMetadata(ctx context.Context, conn drpc.Conn) error { tickerDone := make(chan struct{}) collectDone := make(chan struct{}) ctx, cancel := context.WithCancel(ctx) @@ -567,51 +563,55 @@ func (a *agent) reportMetadataUntilGracefulShutdown() { var ( updatedMetadata = make(map[string]*codersdk.WorkspaceAgentMetadataResult) reportTimeout = 30 * time.Second - reportSemaphore = make(chan struct{}, 1) + reportError = make(chan error, 1) + reportInFlight = false + aAPI = proto.NewDRPCAgentClient(conn) ) - reportSemaphore <- struct{}{} for { select { case <-ctx.Done(): - return + return ctx.Err() case mr := <-metadataResults: // This can overwrite unsent values, but that's fine because // we're only interested about up-to-date values. updatedMetadata[mr.key] = mr.result continue - case <-report: - if len(updatedMetadata) > 0 { - select { - case <-reportSemaphore: - default: - // If there's already a report in flight, don't send - // another one, wait for next tick instead. - continue - } - - metadata := make([]agentsdk.Metadata, 0, len(updatedMetadata)) - for key, result := range updatedMetadata { - metadata = append(metadata, agentsdk.Metadata{ - Key: key, - WorkspaceAgentMetadataResult: *result, - }) - delete(updatedMetadata, key) - } - - go func() { - ctx, cancel := context.WithTimeout(ctx, reportTimeout) - defer func() { - cancel() - reportSemaphore <- struct{}{} - }() - - err := a.client.PostMetadata(ctx, agentsdk.PostMetadataRequest{Metadata: metadata}) - if err != nil { - a.logger.Error(ctx, "agent failed to report metadata", slog.Error(err)) - } - }() + case err := <-reportError: + a.logger.Debug(ctx, "batch update metadata complete", slog.Error(err)) + if err != nil { + return xerrors.Errorf("failed to report metadata: %w", err) } + reportInFlight = false + case <-report: + if len(updatedMetadata) == 0 { + continue + } + if reportInFlight { + // If there's already a report in flight, don't send + // another one, wait for next tick instead. + a.logger.Debug(ctx, "skipped metadata report tick because report is in flight") + continue + } + metadata := make([]*proto.Metadata, 0, len(updatedMetadata)) + for key, result := range updatedMetadata { + pr := agentsdk.ProtoFromMetadataResult(*result) + metadata = append(metadata, &proto.Metadata{ + Key: key, + Result: pr, + }) + delete(updatedMetadata, key) + } + + reportInFlight = true + go func() { + a.logger.Debug(ctx, "batch updating metadata") + ctx, cancel := context.WithTimeout(ctx, reportTimeout) + defer cancel() + + _, err := aAPI.BatchUpdateMetadata(ctx, &proto.BatchUpdateMetadataRequest{Metadata: metadata}) + reportError <- err + }() } } } @@ -783,6 +783,9 @@ func (a *agent) run() (retErr error) { // lifecycle reporting has to be via gracefulShutdownBehaviorRemain connMan.start("report lifecycle", gracefulShutdownBehaviorRemain, a.reportLifecycle) + // metadata reporting can cease as soon as we start gracefully shutting down + connMan.start("report metadata", gracefulShutdownBehaviorStop, a.reportMetadata) + // channels to sync goroutines below // handle manifest // | diff --git a/agent/agenttest/client.go b/agent/agenttest/client.go index 19dc19372b..040edddb6f 100644 --- a/agent/agenttest/client.go +++ b/agent/agenttest/client.go @@ -82,7 +82,6 @@ type Client struct { t testing.TB logger slog.Logger agentID uuid.UUID - metadata map[string]agentsdk.Metadata coordinator tailnet.Coordinator server *drpcserver.Server fakeAgentAPI *FakeAgentAPI @@ -131,22 +130,7 @@ func (c *Client) GetStartup() <-chan *agentproto.Startup { } func (c *Client) GetMetadata() map[string]agentsdk.Metadata { - c.mu.Lock() - defer c.mu.Unlock() - return maps.Clone(c.metadata) -} - -func (c *Client) PostMetadata(ctx context.Context, req agentsdk.PostMetadataRequest) error { - c.mu.Lock() - defer c.mu.Unlock() - if c.metadata == nil { - c.metadata = make(map[string]agentsdk.Metadata) - } - for _, md := range req.Metadata { - c.metadata[md.Key] = md - c.logger.Debug(ctx, "post metadata", slog.F("key", md.Key), slog.F("md", md)) - } - return nil + return c.fakeAgentAPI.GetMetadata() } func (c *Client) GetStartupLogs() []agentsdk.Log { @@ -186,6 +170,7 @@ type FakeAgentAPI struct { appHealthCh chan *agentproto.BatchUpdateAppHealthRequest logsCh chan<- *agentproto.BatchCreateLogsRequest lifecycleStates []codersdk.WorkspaceAgentLifecycle + metadata map[string]agentsdk.Metadata getServiceBannerFunc func() (codersdk.ServiceBannerConfig, error) } @@ -254,9 +239,24 @@ func (f *FakeAgentAPI) UpdateStartup(_ context.Context, req *agentproto.UpdateSt return req.GetStartup(), nil } -func (*FakeAgentAPI) BatchUpdateMetadata(context.Context, *agentproto.BatchUpdateMetadataRequest) (*agentproto.BatchUpdateMetadataResponse, error) { - // TODO implement me - panic("implement me") +func (f *FakeAgentAPI) GetMetadata() map[string]agentsdk.Metadata { + f.Lock() + defer f.Unlock() + return maps.Clone(f.metadata) +} + +func (f *FakeAgentAPI) BatchUpdateMetadata(ctx context.Context, req *agentproto.BatchUpdateMetadataRequest) (*agentproto.BatchUpdateMetadataResponse, error) { + f.Lock() + defer f.Unlock() + if f.metadata == nil { + f.metadata = make(map[string]agentsdk.Metadata) + } + for _, md := range req.Metadata { + smd := agentsdk.MetadataFromProto(md) + f.metadata[md.Key] = smd + f.logger.Debug(ctx, "post metadata", slog.F("key", md.Key), slog.F("md", md)) + } + return &agentproto.BatchUpdateMetadataResponse{}, nil } func (f *FakeAgentAPI) SetLogsChannel(ch chan<- *agentproto.BatchCreateLogsRequest) { diff --git a/codersdk/agentsdk/agentsdk.go b/codersdk/agentsdk/agentsdk.go index bde518cb76..75bec0047e 100644 --- a/codersdk/agentsdk/agentsdk.go +++ b/codersdk/agentsdk/agentsdk.go @@ -85,6 +85,9 @@ type PostMetadataRequest struct { // performance. type PostMetadataRequestDeprecated = codersdk.WorkspaceAgentMetadataResult +// PostMetadata posts agent metadata to the Coder server. +// +// Deprecated: use BatchUpdateMetadata on the agent dRPC API instead func (c *Client) PostMetadata(ctx context.Context, req PostMetadataRequest) error { res, err := c.SDK.Request(ctx, http.MethodPost, "/api/v2/workspaceagents/me/metadata", req) if err != nil { diff --git a/codersdk/agentsdk/convert.go b/codersdk/agentsdk/convert.go index c872a81b1d..8671d9e0b5 100644 --- a/codersdk/agentsdk/convert.go +++ b/codersdk/agentsdk/convert.go @@ -112,6 +112,31 @@ func ProtoFromMetadataDescription(d codersdk.WorkspaceAgentMetadataDescription) } } +func ProtoFromMetadataResult(r codersdk.WorkspaceAgentMetadataResult) *proto.WorkspaceAgentMetadata_Result { + return &proto.WorkspaceAgentMetadata_Result{ + CollectedAt: timestamppb.New(r.CollectedAt), + Age: r.Age, + Value: r.Value, + Error: r.Error, + } +} + +func MetadataResultFromProto(r *proto.WorkspaceAgentMetadata_Result) codersdk.WorkspaceAgentMetadataResult { + return codersdk.WorkspaceAgentMetadataResult{ + CollectedAt: r.GetCollectedAt().AsTime(), + Age: r.GetAge(), + Value: r.GetValue(), + Error: r.GetError(), + } +} + +func MetadataFromProto(m *proto.Metadata) Metadata { + return Metadata{ + Key: m.GetKey(), + WorkspaceAgentMetadataResult: MetadataResultFromProto(m.GetResult()), + } +} + func AgentScriptsFromProto(protoScripts []*proto.WorkspaceAgentScript) ([]codersdk.WorkspaceAgentScript, error) { ret := make([]codersdk.WorkspaceAgentScript, len(protoScripts)) for i, protoScript := range protoScripts { diff --git a/codersdk/agentsdk/convert_test.go b/codersdk/agentsdk/convert_test.go index 3519408b6f..ce40f53c88 100644 --- a/codersdk/agentsdk/convert_test.go +++ b/codersdk/agentsdk/convert_test.go @@ -6,6 +6,7 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/timestamppb" "tailscale.com/tailcfg" "github.com/coder/coder/v2/agent/proto" @@ -176,3 +177,42 @@ func TestProtoFromLifecycle(t *testing.T) { require.Equal(t, s, state) } } + +func TestProtoFromMetadataResult(t *testing.T) { + t.Parallel() + now := dbtime.Now() + result := codersdk.WorkspaceAgentMetadataResult{ + CollectedAt: now, + Age: 4, + Value: "lemons", + Error: "rats", + } + pr := agentsdk.ProtoFromMetadataResult(result) + require.NotNil(t, pr) + require.Equal(t, now, pr.CollectedAt.AsTime()) + require.EqualValues(t, 4, pr.Age) + require.Equal(t, "lemons", pr.Value) + require.Equal(t, "rats", pr.Error) + result2 := agentsdk.MetadataResultFromProto(pr) + require.Equal(t, result, result2) +} + +func TestMetadataFromProto(t *testing.T) { + t.Parallel() + now := dbtime.Now() + pmd := &proto.Metadata{ + Key: "a flat", + Result: &proto.WorkspaceAgentMetadata_Result{ + CollectedAt: timestamppb.New(now), + Age: 88, + Value: "lemons", + Error: "rats", + }, + } + smd := agentsdk.MetadataFromProto(pmd) + require.Equal(t, "a flat", smd.Key) + require.Equal(t, now, smd.CollectedAt) + require.EqualValues(t, 88, smd.Age) + require.Equal(t, "lemons", smd.Value) + require.Equal(t, "rats", smd.Error) +}