From 1cd5f38cb0bf36ddcaffb1b00e9545bf7109d04e Mon Sep 17 00:00:00 2001 From: Colin Adler Date: Wed, 25 Jan 2023 15:27:36 -0600 Subject: [PATCH] feat: add debug server for tailnet coordinators (#5861) Implements a Tailscale-like debug server for our in-memory coordinator. This should provide some visibility into why connections could be failing. Resolves: https://github.com/coder/coder/issues/5845 ![image](https://user-images.githubusercontent.com/6332295/214680832-2724d633-2d54-44d6-a7ce-5841e5824ee5.png) --- agent/agent_test.go | 2 +- coderd/apidoc/docs.go | 22 ++++ coderd/apidoc/swagger.json | 18 ++++ coderd/coderd.go | 19 ++++ coderd/coderdtest/authorize.go | 5 + coderd/coderdtest/swaggerparser.go | 5 +- coderd/debug.go | 14 +++ coderd/rbac/object.go | 5 + coderd/workspaceagents.go | 12 ++- coderd/wsconncache/wsconncache_test.go | 2 +- docs/api/debug.md | 21 ++++ docs/manifest.json | 4 + enterprise/tailnet/coordinator.go | 10 +- enterprise/tailnet/coordinator_test.go | 10 +- tailnet/coordinator.go | 136 +++++++++++++++++++++---- tailnet/coordinator_test.go | 10 +- 16 files changed, 261 insertions(+), 34 deletions(-) create mode 100644 coderd/debug.go create mode 100644 docs/api/debug.md diff --git a/agent/agent_test.go b/agent/agent_test.go index 0f0d93f923..384c967a7f 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -1193,7 +1193,7 @@ func (c *client) ListenWorkspaceAgent(_ context.Context) (net.Conn, error) { } c.t.Cleanup(c.lastWorkspaceAgent) go func() { - _ = c.coordinator.ServeAgent(serverConn, c.agentID) + _ = c.coordinator.ServeAgent(serverConn, c.agentID, "") close(closed) }() return clientConn, nil diff --git a/coderd/apidoc/docs.go b/coderd/apidoc/docs.go index a14ebd4ad5..b288270655 100644 --- a/coderd/apidoc/docs.go +++ b/coderd/apidoc/docs.go @@ -362,6 +362,28 @@ const docTemplate = `{ } } }, + "/debug/coordinator": { + "get": { + "security": [ + { + "CoderSessionToken": [] + } + ], + "produces": [ + "text/html" + ], + "tags": [ + "Debug" + ], + "summary": "Debug Info Wireguard Coordinator", + "operationId": "debug-info-wireguard-coordinator", + "responses": { + "200": { + "description": "OK" + } + } + } + }, "/entitlements": { "get": { "security": [ diff --git a/coderd/apidoc/swagger.json b/coderd/apidoc/swagger.json index 53b5b41efd..ad3a306451 100644 --- a/coderd/apidoc/swagger.json +++ b/coderd/apidoc/swagger.json @@ -308,6 +308,24 @@ } } }, + "/debug/coordinator": { + "get": { + "security": [ + { + "CoderSessionToken": [] + } + ], + "produces": ["text/html"], + "tags": ["Debug"], + "summary": "Debug Info Wireguard Coordinator", + "operationId": "debug-info-wireguard-coordinator", + "responses": { + "200": { + "description": "OK" + } + } + } + }, "/entitlements": { "get": { "security": [ diff --git a/coderd/coderd.go b/coderd/coderd.go index 6119497796..c22da2be6b 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -613,6 +613,25 @@ func New(options *Options) *API { r.Get("/", api.workspaceApplicationAuth) }) }) + + r.Route("/debug", func(r chi.Router) { + r.Use( + apiKeyMiddleware, + // Ensure only owners can access debug endpoints. + func(next http.Handler) http.Handler { + return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + if !api.Authorize(r, rbac.ActionRead, rbac.ResourceDebugInfo) { + httpapi.ResourceNotFound(rw) + return + } + + next.ServeHTTP(rw, r) + }) + }, + ) + + r.Get("/coordinator", api.debugCoordinator) + }) }) if options.SwaggerEndpoint { diff --git a/coderd/coderdtest/authorize.go b/coderd/coderdtest/authorize.go index 03ad70ed33..783ef5e964 100644 --- a/coderd/coderdtest/authorize.go +++ b/coderd/coderdtest/authorize.go @@ -272,6 +272,11 @@ func AGPLRoutes(a *AuthTester) (map[string]string, map[string]RouteCheck) { AssertAction: rbac.ActionRead, AssertObject: rbac.ResourceTemplate, }, + + "GET:/api/v2/debug/coordinator": { + AssertAction: rbac.ActionRead, + AssertObject: rbac.ResourceDebugInfo, + }, } // Routes like proxy routes support all HTTP methods. A helper func to expand diff --git a/coderd/coderdtest/swaggerparser.go b/coderd/coderdtest/swaggerparser.go index c5d19cf088..8ff5b78871 100644 --- a/coderd/coderdtest/swaggerparser.go +++ b/coderd/coderdtest/swaggerparser.go @@ -327,7 +327,7 @@ func assertAccept(t *testing.T, comment SwaggerComment) { } } -var allowedProduceTypes = []string{"json", "text/event-stream"} +var allowedProduceTypes = []string{"json", "text/event-stream", "text/html"} func assertProduce(t *testing.T, comment SwaggerComment) { var hasResponseModel bool @@ -344,7 +344,8 @@ func assertProduce(t *testing.T, comment SwaggerComment) { } else { if (comment.router == "/workspaceagents/me/app-health" && comment.method == "post") || (comment.router == "/workspaceagents/me/version" && comment.method == "post") || - (comment.router == "/licenses/{id}" && comment.method == "delete") { + (comment.router == "/licenses/{id}" && comment.method == "delete") || + (comment.router == "/debug/coordinator" && comment.method == "get") { return // Exception: HTTP 200 is returned without response entity } diff --git a/coderd/debug.go b/coderd/debug.go new file mode 100644 index 0000000000..c22b77e564 --- /dev/null +++ b/coderd/debug.go @@ -0,0 +1,14 @@ +package coderd + +import "net/http" + +// @Summary Debug Info Wireguard Coordinator +// @ID debug-info-wireguard-coordinator +// @Security CoderSessionToken +// @Produce text/html +// @Tags Debug +// @Success 200 +// @Router /debug/coordinator [get] +func (api *API) debugCoordinator(rw http.ResponseWriter, r *http.Request) { + (*api.TailnetCoordinator.Load()).ServeHTTPDebug(rw, r) +} diff --git a/coderd/rbac/object.go b/coderd/rbac/object.go index 79dfd1b619..1ee606a33c 100644 --- a/coderd/rbac/object.go +++ b/coderd/rbac/object.go @@ -150,6 +150,11 @@ var ( ResourceReplicas = Object{ Type: "replicas", } + + // ResourceDebugInfo controls access to the debug routes `/api/v2/debug/*`. + ResourceDebugInfo = Object{ + Type: "debug_info", + } ) // Object is used to create objects for authz checks when you have none in diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 69d056f66d..0b28cc9833 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -521,6 +521,16 @@ func (api *API) workspaceAgentCoordinate(rw http.ResponseWriter, r *http.Request }) return } + + workspace, err := api.Database.GetWorkspaceByID(ctx, build.WorkspaceID) + if err != nil { + httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ + Message: "Internal error fetching workspace.", + Detail: err.Error(), + }) + return + } + // Ensure the resource is still valid! // We only accept agents for resources on the latest build. ensureLatestBuild := func() error { @@ -618,7 +628,7 @@ func (api *API) workspaceAgentCoordinate(rw http.ResponseWriter, r *http.Request closeChan := make(chan struct{}) go func() { defer close(closeChan) - err := (*api.TailnetCoordinator.Load()).ServeAgent(wsNetConn, workspaceAgent.ID) + err := (*api.TailnetCoordinator.Load()).ServeAgent(wsNetConn, workspaceAgent.ID, fmt.Sprintf("%s-%s", workspace.Name, workspaceAgent.Name)) if err != nil { api.Logger.Warn(ctx, "tailnet coordinator agent error", slog.Error(err)) _ = conn.Close(websocket.StatusInternalError, err.Error()) diff --git a/coderd/wsconncache/wsconncache_test.go b/coderd/wsconncache/wsconncache_test.go index a73e95e989..37ff4fc0b8 100644 --- a/coderd/wsconncache/wsconncache_test.go +++ b/coderd/wsconncache/wsconncache_test.go @@ -207,7 +207,7 @@ func (c *client) ListenWorkspaceAgent(_ context.Context) (net.Conn, error) { <-closed }) go func() { - _ = c.coordinator.ServeAgent(serverConn, c.agentID) + _ = c.coordinator.ServeAgent(serverConn, c.agentID, "") close(closed) }() return clientConn, nil diff --git a/docs/api/debug.md b/docs/api/debug.md new file mode 100644 index 0000000000..a32c9f6416 --- /dev/null +++ b/docs/api/debug.md @@ -0,0 +1,21 @@ +# Debug + +## Debug Info Wireguard Coordinator + +### Code samples + +```shell +# Example request using curl +curl -X GET http://coder-server:8080/api/v2/debug/coordinator \ + -H 'Coder-Session-Token: API_KEY' +``` + +`GET /debug/coordinator` + +### Responses + +| Status | Meaning | Description | Schema | +| ------ | ------------------------------------------------------- | ----------- | ------ | +| 200 | [OK](https://tools.ietf.org/html/rfc7231#section-6.3.1) | OK | | + +To perform this operation, you must be authenticated. [Learn more](authentication.md). diff --git a/docs/manifest.json b/docs/manifest.json index 25c72423e5..0c38e57068 100644 --- a/docs/manifest.json +++ b/docs/manifest.json @@ -364,6 +364,10 @@ "title": "Builds", "path": "./api/builds.md" }, + { + "title": "Debug", + "path": "./api/debug.md" + }, { "title": "Enterprise", "path": "./api/enterprise.md" diff --git a/enterprise/tailnet/coordinator.go b/enterprise/tailnet/coordinator.go index c24107fe2a..83def00ba1 100644 --- a/enterprise/tailnet/coordinator.go +++ b/enterprise/tailnet/coordinator.go @@ -5,8 +5,10 @@ import ( "context" "encoding/json" "errors" + "fmt" "io" "net" + "net/http" "sync" "time" @@ -174,7 +176,7 @@ func (c *haCoordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *js // ServeAgent accepts a WebSocket connection to an agent that listens to // incoming connections and publishes node updates. -func (c *haCoordinator) ServeAgent(conn net.Conn, id uuid.UUID) error { +func (c *haCoordinator) ServeAgent(conn net.Conn, id uuid.UUID, _ string) error { // Tell clients on other instances to send a callmemaybe to us. err := c.publishAgentHello(id) if err != nil { @@ -573,3 +575,9 @@ func (c *haCoordinator) formatAgentUpdate(id uuid.UUID, node *agpl.Node) ([]byte return buf.Bytes(), nil } + +func (*haCoordinator) ServeHTTPDebug(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/html; charset=utf-8") + fmt.Fprintf(w, "

coordinator

") + fmt.Fprintf(w, "

ha debug coming soon

") +} diff --git a/enterprise/tailnet/coordinator_test.go b/enterprise/tailnet/coordinator_test.go index 86cee94dbd..cf85af4a5a 100644 --- a/enterprise/tailnet/coordinator_test.go +++ b/enterprise/tailnet/coordinator_test.go @@ -60,7 +60,7 @@ func TestCoordinatorSingle(t *testing.T) { id := uuid.New() closeChan := make(chan struct{}) go func() { - err := coordinator.ServeAgent(server, id) + err := coordinator.ServeAgent(server, id, "") assert.NoError(t, err) close(closeChan) }() @@ -91,7 +91,7 @@ func TestCoordinatorSingle(t *testing.T) { agentID := uuid.New() closeAgentChan := make(chan struct{}) go func() { - err := coordinator.ServeAgent(agentServerWS, agentID) + err := coordinator.ServeAgent(agentServerWS, agentID, "") assert.NoError(t, err) close(closeAgentChan) }() @@ -142,7 +142,7 @@ func TestCoordinatorSingle(t *testing.T) { }) closeAgentChan = make(chan struct{}) go func() { - err := coordinator.ServeAgent(agentServerWS, agentID) + err := coordinator.ServeAgent(agentServerWS, agentID, "") assert.NoError(t, err) close(closeAgentChan) }() @@ -184,7 +184,7 @@ func TestCoordinatorHA(t *testing.T) { agentID := uuid.New() closeAgentChan := make(chan struct{}) go func() { - err := coordinator1.ServeAgent(agentServerWS, agentID) + err := coordinator1.ServeAgent(agentServerWS, agentID, "") assert.NoError(t, err) close(closeAgentChan) }() @@ -240,7 +240,7 @@ func TestCoordinatorHA(t *testing.T) { }) closeAgentChan = make(chan struct{}) go func() { - err := coordinator1.ServeAgent(agentServerWS, agentID) + err := coordinator1.ServeAgent(agentServerWS, agentID, "") assert.NoError(t, err) close(closeAgentChan) }() diff --git a/tailnet/coordinator.go b/tailnet/coordinator.go index 7c3f48c9ea..1373b370a5 100644 --- a/tailnet/coordinator.go +++ b/tailnet/coordinator.go @@ -4,10 +4,13 @@ import ( "context" "encoding/json" "errors" + "fmt" "io" "net" + "net/http" "net/netip" "sync" + "sync/atomic" "time" "github.com/google/uuid" @@ -22,6 +25,9 @@ import ( // └──────────────────┘ └────────────────────┘ └───────────────────┘ └──────────────────┘ // Coordinators have different guarantees for HA support. type Coordinator interface { + // ServeHTTPDebug serves a debug webpage that shows the internal state of + // the coordinator. + ServeHTTPDebug(w http.ResponseWriter, r *http.Request) // Node returns an in-memory node by ID. Node(id uuid.UUID) *Node // ServeClient accepts a WebSocket connection that wants to connect to an agent @@ -29,7 +35,8 @@ type Coordinator interface { ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) error // ServeAgent accepts a WebSocket connection to an agent that listens to // incoming connections and publishes node updates. - ServeAgent(conn net.Conn, id uuid.UUID) error + // Name is just used for debug information. It can be left blank. + ServeAgent(conn net.Conn, id uuid.UUID, name string) error // Close closes the coordinator. Close() error } @@ -104,8 +111,8 @@ func NewCoordinator() Coordinator { return &coordinator{ closed: false, nodes: map[uuid.UUID]*Node{}, - agentSockets: map[uuid.UUID]idConn{}, - agentToConnectionSockets: map[uuid.UUID]map[uuid.UUID]net.Conn{}, + agentSockets: map[uuid.UUID]*trackedConn{}, + agentToConnectionSockets: map[uuid.UUID]map[uuid.UUID]*trackedConn{}, } } @@ -117,23 +124,34 @@ func NewCoordinator() Coordinator { // This coordinator is incompatible with multiple Coder // replicas as all node data is in-memory. type coordinator struct { - mutex sync.Mutex + mutex sync.RWMutex closed bool // nodes maps agent and connection IDs their respective node. nodes map[uuid.UUID]*Node // agentSockets maps agent IDs to their open websocket. - agentSockets map[uuid.UUID]idConn + agentSockets map[uuid.UUID]*trackedConn // agentToConnectionSockets maps agent IDs to connection IDs of conns that // are subscribed to updates for that agent. - agentToConnectionSockets map[uuid.UUID]map[uuid.UUID]net.Conn + agentToConnectionSockets map[uuid.UUID]map[uuid.UUID]*trackedConn } -type idConn struct { +type trackedConn struct { + net.Conn + // id is an ephemeral UUID used to uniquely identify the owner of the // connection. - id uuid.UUID - conn net.Conn + id uuid.UUID + + name string + start int64 + lastWrite int64 + overwrites int64 +} + +func (t *trackedConn) Write(b []byte) (n int, err error) { + atomic.StoreInt64(&t.lastWrite, time.Now().Unix()) + return t.Conn.Write(b) } // Node returns an in-memory node by ID. @@ -182,12 +200,18 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) c.mutex.Lock() connectionSockets, ok := c.agentToConnectionSockets[agent] if !ok { - connectionSockets = map[uuid.UUID]net.Conn{} + connectionSockets = map[uuid.UUID]*trackedConn{} c.agentToConnectionSockets[agent] = connectionSockets } + + now := time.Now().Unix() // Insert this connection into a map so the agent // can publish node updates. - connectionSockets[id] = conn + connectionSockets[id] = &trackedConn{ + Conn: conn, + start: now, + lastWrite: now, + } c.mutex.Unlock() defer func() { c.mutex.Lock() @@ -243,7 +267,7 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json return xerrors.Errorf("marshal nodes: %w", err) } - _, err = agentSocket.conn.Write(data) + _, err = agentSocket.Write(data) if err != nil { if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) || errors.Is(err, context.Canceled) { return nil @@ -256,7 +280,7 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json // ServeAgent accepts a WebSocket connection to an agent that // listens to incoming connections and publishes node updates. -func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID) error { +func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error { c.mutex.Lock() if c.closed { c.mutex.Unlock() @@ -289,6 +313,8 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID) error { // This uniquely identifies a connection that belongs to this goroutine. unique := uuid.New() + now := time.Now().Unix() + overwrites := int64(0) // If an old agent socket is connected, we close it to avoid any leaks. This // shouldn't ever occur because we expect one agent to be running, but it's @@ -297,11 +323,17 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID) error { // dead. oldAgentSocket, ok := c.agentSockets[id] if ok { - _ = oldAgentSocket.conn.Close() + overwrites = oldAgentSocket.overwrites + 1 + _ = oldAgentSocket.Close() } - c.agentSockets[id] = idConn{ + c.agentSockets[id] = &trackedConn{ id: unique, - conn: conn, + Conn: conn, + + name: name, + start: now, + lastWrite: now, + overwrites: overwrites, } c.mutex.Unlock() @@ -311,7 +343,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID) error { // Only delete the connection if it's ours. It could have been // overwritten. - if idConn := c.agentSockets[id]; idConn.id == unique { + if idConn, ok := c.agentSockets[id]; ok && idConn.id == unique { delete(c.agentSockets, id) delete(c.nodes, id) } @@ -382,7 +414,7 @@ func (c *coordinator) Close() error { for _, socket := range c.agentSockets { socket := socket go func() { - _ = socket.conn.Close() + _ = socket.Close() wg.Done() }() } @@ -403,3 +435,71 @@ func (c *coordinator) Close() error { wg.Wait() return nil } + +func (c *coordinator) ServeHTTPDebug(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/html; charset=utf-8") + now := time.Now() + + c.mutex.RLock() + defer c.mutex.RUnlock() + + fmt.Fprintln(w, "

in-memory wireguard coordinator debug

") + fmt.Fprintf(w, "

# agents: total %d

\n", len(c.agentSockets)) + fmt.Fprintln(w, "") + + missingAgents := map[uuid.UUID]map[uuid.UUID]*trackedConn{} + for agentID, conns := range c.agentToConnectionSockets { + if len(conns) == 0 { + continue + } + + if _, ok := c.agentSockets[agentID]; !ok { + missingAgents[agentID] = conns + } + } + + fmt.Fprintf(w, "

# missing agents: total %d

\n", len(missingAgents)) + fmt.Fprintln(w, "") +} diff --git a/tailnet/coordinator_test.go b/tailnet/coordinator_test.go index 60d909f715..7dc90ff6f4 100644 --- a/tailnet/coordinator_test.go +++ b/tailnet/coordinator_test.go @@ -48,7 +48,7 @@ func TestCoordinator(t *testing.T) { id := uuid.New() closeChan := make(chan struct{}) go func() { - err := coordinator.ServeAgent(server, id) + err := coordinator.ServeAgent(server, id, "") assert.NoError(t, err) close(closeChan) }() @@ -76,7 +76,7 @@ func TestCoordinator(t *testing.T) { agentID := uuid.New() closeAgentChan := make(chan struct{}) go func() { - err := coordinator.ServeAgent(agentServerWS, agentID) + err := coordinator.ServeAgent(agentServerWS, agentID, "") assert.NoError(t, err) close(closeAgentChan) }() @@ -127,7 +127,7 @@ func TestCoordinator(t *testing.T) { }) closeAgentChan = make(chan struct{}) go func() { - err := coordinator.ServeAgent(agentServerWS, agentID) + err := coordinator.ServeAgent(agentServerWS, agentID, "") assert.NoError(t, err) close(closeAgentChan) }() @@ -160,7 +160,7 @@ func TestCoordinator(t *testing.T) { agentID := uuid.New() closeAgentChan1 := make(chan struct{}) go func() { - err := coordinator.ServeAgent(agentServerWS1, agentID) + err := coordinator.ServeAgent(agentServerWS1, agentID, "") assert.NoError(t, err) close(closeAgentChan1) }() @@ -205,7 +205,7 @@ func TestCoordinator(t *testing.T) { }) closeAgentChan2 := make(chan struct{}) go func() { - err := coordinator.ServeAgent(agentServerWS2, agentID) + err := coordinator.ServeAgent(agentServerWS2, agentID, "") assert.NoError(t, err) close(closeAgentChan2) }()