feat: use agent v2 API to fetch manifest (#11832)

Agent uses the v2 API to obtain the manifest, instead of the HTTP API.
This commit is contained in:
Spike Curtis 2024-01-30 10:11:28 +04:00 committed by GitHub
parent 9cf4e7f15a
commit da8bb1c198
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 91 additions and 83 deletions

View File

@ -88,7 +88,6 @@ type Options struct {
}
type Client interface {
Manifest(ctx context.Context) (agentsdk.Manifest, error)
Listen(ctx context.Context) (drpc.Conn, error)
ReportStats(ctx context.Context, log slog.Logger, statsChan <-chan *agentsdk.Stats, setInterval func(time.Duration)) (io.Closer, error)
PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error
@ -96,6 +95,7 @@ type Client interface {
PostStartup(ctx context.Context, req agentsdk.PostStartupRequest) error
PostMetadata(ctx context.Context, req agentsdk.PostMetadataRequest) error
PatchLogs(ctx context.Context, req agentsdk.PatchLogs) error
RewriteDERPMap(derpMap *tailcfg.DERPMap)
}
type Agent interface {
@ -713,15 +713,20 @@ func (a *agent) run(ctx context.Context) error {
serviceBanner := agentsdk.ServiceBannerFromProto(sbp)
a.serviceBanner.Store(&serviceBanner)
manifest, err := a.client.Manifest(ctx)
mp, err := aAPI.GetManifest(ctx, &proto.GetManifestRequest{})
if err != nil {
return xerrors.Errorf("fetch metadata: %w", err)
}
a.logger.Info(ctx, "fetched manifest", slog.F("manifest", manifest))
a.logger.Info(ctx, "fetched manifest", slog.F("manifest", mp))
manifest, err := agentsdk.ManifestFromProto(mp)
if err != nil {
a.logger.Critical(ctx, "failed to convert manifest", slog.F("manifest", mp), slog.Error(err))
return xerrors.Errorf("convert manifest: %w", err)
}
if manifest.AgentID == uuid.Nil {
return xerrors.New("nil agentID returned by manifest")
}
a.client.RewriteDERPMap(manifest.DERPMap)
// Expand the directory and send it back to coderd so external
// applications that rely on the directory can use it.
@ -1124,6 +1129,7 @@ func (a *agent) runDERPMapSubscriber(ctx context.Context, conn drpc.Conn, networ
return xerrors.Errorf("recv DERPMap error: %w", err)
}
dm := tailnet.DERPMapFromProto(dmp)
a.client.RewriteDERPMap(dm)
network.SetDERPMap(dm)
}
}

View File

@ -2043,6 +2043,9 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
if metadata.WorkspaceName == "" {
metadata.WorkspaceName = "test-workspace"
}
if metadata.WorkspaceID == uuid.Nil {
metadata.WorkspaceID = uuid.New()
}
coordinator := tailnet.NewCoordinator(logger)
t.Cleanup(func() {
_ = coordinator.Close()

View File

@ -49,7 +49,9 @@ func NewClient(t testing.TB,
}
err := proto.DRPCRegisterTailnet(mux, drpcService)
require.NoError(t, err)
fakeAAPI := NewFakeAgentAPI(t, logger)
mp, err := agentsdk.ProtoFromManifest(manifest)
require.NoError(t, err)
fakeAAPI := NewFakeAgentAPI(t, logger, mp)
err = agentproto.DRPCRegisterAgent(mux, fakeAAPI)
require.NoError(t, err)
server := drpcserver.NewWithOptions(mux, drpcserver.Options{
@ -64,7 +66,6 @@ func NewClient(t testing.TB,
t: t,
logger: logger.Named("client"),
agentID: agentID,
manifest: manifest,
statsChan: statsChan,
coordinator: coordinator,
server: server,
@ -77,7 +78,6 @@ type Client struct {
t testing.TB
logger slog.Logger
agentID uuid.UUID
manifest agentsdk.Manifest
metadata map[string]agentsdk.Metadata
statsChan chan *agentsdk.Stats
coordinator tailnet.Coordinator
@ -94,14 +94,12 @@ type Client struct {
derpMapOnce sync.Once
}
func (*Client) RewriteDERPMap(*tailcfg.DERPMap) {}
func (c *Client) Close() {
c.derpMapOnce.Do(func() { close(c.derpMapUpdates) })
}
func (c *Client) Manifest(_ context.Context) (agentsdk.Manifest, error) {
return c.manifest, nil
}
func (c *Client) Listen(ctx context.Context) (drpc.Conn, error) {
conn, lis := drpcsdk.MemTransportPipe()
c.LastWorkspaceAgent = func() {
@ -252,12 +250,13 @@ type FakeAgentAPI struct {
t testing.TB
logger slog.Logger
manifest *agentproto.Manifest
getServiceBannerFunc func() (codersdk.ServiceBannerConfig, error)
}
func (*FakeAgentAPI) GetManifest(context.Context, *agentproto.GetManifestRequest) (*agentproto.Manifest, error) {
// TODO implement me
panic("implement me")
func (f *FakeAgentAPI) GetManifest(context.Context, *agentproto.GetManifestRequest) (*agentproto.Manifest, error) {
return f.manifest, nil
}
func (f *FakeAgentAPI) SetServiceBannerFunc(fn func() (codersdk.ServiceBannerConfig, error)) {
@ -310,9 +309,10 @@ func (*FakeAgentAPI) BatchCreateLogs(context.Context, *agentproto.BatchCreateLog
panic("implement me")
}
func NewFakeAgentAPI(t testing.TB, logger slog.Logger) *FakeAgentAPI {
func NewFakeAgentAPI(t testing.TB, logger slog.Logger, manifest *agentproto.Manifest) *FakeAgentAPI {
return &FakeAgentAPI{
t: t,
logger: logger.Named("FakeAgentAPI"),
t: t,
logger: logger.Named("FakeAgentAPI"),
manifest: manifest,
}
}

View File

@ -97,6 +97,16 @@ func New(opts Options) *API {
AgentFn: api.agent,
Database: opts.Database,
DerpMapFn: opts.DerpMapFn,
WorkspaceIDFn: func(ctx context.Context, wa *database.WorkspaceAgent) (uuid.UUID, error) {
if opts.WorkspaceID != uuid.Nil {
return opts.WorkspaceID, nil
}
ws, err := opts.Database.GetWorkspaceByAgentID(ctx, wa.ID)
if err != nil {
return uuid.Nil, err
}
return ws.Workspace.ID, nil
},
}
api.ServiceBannerAPI = &ServiceBannerAPI{

View File

@ -23,6 +23,7 @@ import (
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/agent"
"github.com/coder/coder/v2/agent/agenttest"
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd"
"github.com/coder/coder/v2/coderd/coderdtest"
"github.com/coder/coder/v2/coderd/coderdtest/oidctest"
@ -500,8 +501,7 @@ func TestWorkspaceAgentTailnetDirectDisabled(t *testing.T) {
// Verify that the manifest has DisableDirectConnections set to true.
agentClient := agentsdk.New(client.URL)
agentClient.SetSessionToken(r.AgentToken)
manifest, err := agentClient.Manifest(ctx)
require.NoError(t, err)
manifest := requireGetManifest(ctx, t, agentClient)
require.True(t, manifest.DisableDirectConnections)
_ = agenttest.New(t, client.URL, r.AgentToken)
@ -825,11 +825,10 @@ func TestWorkspaceAgentAppHealth(t *testing.T) {
agentClient := agentsdk.New(client.URL)
agentClient.SetSessionToken(r.AgentToken)
manifest, err := agentClient.Manifest(ctx)
require.NoError(t, err)
manifest := requireGetManifest(ctx, t, agentClient)
require.EqualValues(t, codersdk.WorkspaceAppHealthDisabled, manifest.Apps[0].Health)
require.EqualValues(t, codersdk.WorkspaceAppHealthInitializing, manifest.Apps[1].Health)
err = agentClient.PostAppHealth(ctx, agentsdk.PostAppHealthsRequest{})
err := agentClient.PostAppHealth(ctx, agentsdk.PostAppHealthsRequest{})
require.Error(t, err)
// empty
err = agentClient.PostAppHealth(ctx, agentsdk.PostAppHealthsRequest{})
@ -855,8 +854,7 @@ func TestWorkspaceAgentAppHealth(t *testing.T) {
},
})
require.NoError(t, err)
manifest, err = agentClient.Manifest(ctx)
require.NoError(t, err)
manifest = requireGetManifest(ctx, t, agentClient)
require.EqualValues(t, codersdk.WorkspaceAppHealthHealthy, manifest.Apps[1].Health)
// update to unhealthy
err = agentClient.PostAppHealth(ctx, agentsdk.PostAppHealthsRequest{
@ -865,8 +863,7 @@ func TestWorkspaceAgentAppHealth(t *testing.T) {
},
})
require.NoError(t, err)
manifest, err = agentClient.Manifest(ctx)
require.NoError(t, err)
manifest = requireGetManifest(ctx, t, agentClient)
require.EqualValues(t, codersdk.WorkspaceAppHealthUnhealthy, manifest.Apps[1].Health)
}
@ -1110,8 +1107,7 @@ func TestWorkspaceAgent_Metadata(t *testing.T) {
ctx := testutil.Context(t, testutil.WaitMedium)
manifest, err := agentClient.Manifest(ctx)
require.NoError(t, err)
manifest := requireGetManifest(ctx, t, agentClient)
// Verify manifest API response.
require.Equal(t, workspace.ID, manifest.WorkspaceID)
@ -1282,8 +1278,7 @@ func TestWorkspaceAgent_Metadata_CatchMemoryLeak(t *testing.T) {
ctx, cancel := context.WithCancel(testutil.Context(t, testutil.WaitSuperLong))
manifest, err := agentClient.Manifest(ctx)
require.NoError(t, err)
manifest := requireGetManifest(ctx, t, agentClient)
post := func(ctx context.Context, key, value string) error {
return agentClient.PostMetadata(ctx, agentsdk.PostMetadataRequest{
@ -1630,3 +1625,18 @@ func TestWorkspaceAgentExternalAuthListen(t *testing.T) {
require.Equal(t, 1, validateCalls, "validate calls duplicated on same token")
})
}
func requireGetManifest(ctx context.Context, t testing.TB, client agent.Client) agentsdk.Manifest {
conn, err := client.Listen(ctx)
require.NoError(t, err)
defer func() {
cErr := conn.Close()
require.NoError(t, cErr)
}()
aAPI := agentproto.NewDRPCAgentClient(conn)
mp, err := aAPI.GetManifest(ctx, &agentproto.GetManifestRequest{})
require.NoError(t, err)
manifest, err := agentsdk.ManifestFromProto(mp)
require.NoError(t, err)
return manifest
}

View File

@ -20,6 +20,7 @@ import (
"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/agent"
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/coderdtest"
"github.com/coder/coder/v2/coderd/workspaceapps"
"github.com/coder/coder/v2/coderd/workspaceapps/appurl"
@ -397,7 +398,10 @@ func createWorkspaceWithApps(t *testing.T, client *codersdk.Client, orgID uuid.U
primaryAppHost, err := client.AppHost(appHostCtx)
require.NoError(t, err)
if primaryAppHost.Host != "" {
manifest, err := agentClient.Manifest(appHostCtx)
rpcConn, err := agentClient.Listen(appHostCtx)
require.NoError(t, err)
aAPI := agentproto.NewDRPCAgentClient(rpcConn)
manifest, err := aAPI.GetManifest(appHostCtx, &agentproto.GetManifestRequest{})
require.NoError(t, err)
appHost := appurl.ApplicationURL{
@ -408,7 +412,9 @@ func createWorkspaceWithApps(t *testing.T, client *codersdk.Client, orgID uuid.U
Username: me.Username,
}
proxyURL := "http://" + appHost.String() + strings.ReplaceAll(primaryAppHost.Host, "*", "")
require.Equal(t, manifest.VSCodePortProxyURI, proxyURL)
require.Equal(t, manifest.VsCodePortProxyUri, proxyURL)
err = rpcConn.Close()
require.NoError(t, err)
}
agentCloser := agent.New(agent.Options{
Client: agentClient,

View File

@ -135,35 +135,13 @@ type Script struct {
Script string `json:"script"`
}
// Manifest fetches manifest for the currently authenticated workspace agent.
func (c *Client) Manifest(ctx context.Context) (Manifest, error) {
res, err := c.SDK.Request(ctx, http.MethodGet, "/api/v2/workspaceagents/me/manifest", nil)
if err != nil {
return Manifest{}, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return Manifest{}, codersdk.ReadBodyAsError(res)
}
var agentMeta Manifest
err = json.NewDecoder(res.Body).Decode(&agentMeta)
if err != nil {
return Manifest{}, err
}
err = c.rewriteDerpMap(agentMeta.DERPMap)
if err != nil {
return Manifest{}, err
}
return agentMeta, nil
}
// rewriteDerpMap rewrites the DERP map to use the access URL of the SDK as the
// RewriteDERPMap rewrites the DERP map to use the access URL of the SDK as the
// "embedded relay" access URL. The passed derp map is modified in place.
//
// Agents can provide an arbitrary access URL that may be different that the
// globally configured one. This breaks the built-in DERP, which would continue
// to reference the global access URL.
func (c *Client) rewriteDerpMap(derpMap *tailcfg.DERPMap) error {
func (c *Client) RewriteDERPMap(derpMap *tailcfg.DERPMap) {
accessingPort := c.SDK.URL.Port()
if accessingPort == "" {
accessingPort = "80"
@ -173,7 +151,9 @@ func (c *Client) rewriteDerpMap(derpMap *tailcfg.DERPMap) error {
}
accessPort, err := strconv.Atoi(accessingPort)
if err != nil {
return xerrors.Errorf("convert accessing port %q: %w", accessingPort, err)
// this should never happen because URL.Port() returns the empty string if the port is not
// valid.
c.SDK.Logger().Critical(context.Background(), "failed to parse URL port", slog.F("port", accessingPort))
}
for _, region := range derpMap.Regions {
if !region.EmbeddedRelay {
@ -189,7 +169,6 @@ func (c *Client) rewriteDerpMap(derpMap *tailcfg.DERPMap) error {
node.ForceHTTP = c.SDK.URL.Scheme == "http"
}
}
return nil
}
// Listen connects to the workspace agent API WebSocket

View File

@ -5,7 +5,6 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"sync/atomic"
"testing"
"time"
@ -20,37 +19,32 @@ import (
"github.com/coder/coder/v2/testutil"
)
func TestWorkspaceAgentMetadata(t *testing.T) {
func TestWorkspaceRewriteDERPMap(t *testing.T) {
t.Parallel()
// This test ensures that the DERP map returned properly
// mutates built-in DERPs with the client access URL.
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
httpapi.Write(context.Background(), w, http.StatusOK, agentsdk.Manifest{
DERPMap: &tailcfg.DERPMap{
Regions: map[int]*tailcfg.DERPRegion{
1: {
EmbeddedRelay: true,
RegionID: 1,
Nodes: []*tailcfg.DERPNode{{
HostName: "bananas.org",
DERPPort: 1,
}},
},
},
// This test ensures that RewriteDERPMap mutates built-in DERPs with the
// client access URL.
dm := &tailcfg.DERPMap{
Regions: map[int]*tailcfg.DERPRegion{
1: {
EmbeddedRelay: true,
RegionID: 1,
Nodes: []*tailcfg.DERPNode{{
HostName: "bananas.org",
DERPPort: 1,
}},
},
})
}))
parsed, err := url.Parse(srv.URL)
},
}
parsed, err := url.Parse("https://coconuts.org:44558")
require.NoError(t, err)
client := agentsdk.New(parsed)
manifest, err := client.Manifest(context.Background())
require.NoError(t, err)
region := manifest.DERPMap.Regions[1]
client.RewriteDERPMap(dm)
region := dm.Regions[1]
require.True(t, region.EmbeddedRelay)
require.Len(t, region.Nodes, 1)
node := region.Nodes[0]
require.Equal(t, parsed.Hostname(), node.HostName)
require.Equal(t, parsed.Port(), strconv.Itoa(node.DERPPort))
require.Equal(t, "coconuts.org", node.HostName)
require.Equal(t, 44558, node.DERPPort)
}
func TestAgentReportStats(t *testing.T) {