fix: ensure wsproxy `MultiAgent` is closed when websocket dies (#11414)

The `SingleTailnet` behavior only checked to see if the `MultiAgent` was
closed, but the websocket error was not being propogated into the
`MultiAgent`, causing it to never be swapped for a new working one.

Fixes https://github.com/coder/coder/issues/11401

Before:
```
Coder Workspace Proxy v0.0.0-devel+85ff030 - Your Self-Hosted Remote Development Platform
Started HTTP listener at http://0.0.0.0:3001

View the Web UI: http://127.0.0.1:3001

==> Logs will stream in below (press ctrl+c to gracefully exit):
2024-01-04 20:11:56.376 [warn]  net.workspace-proxy.servertailnet: broadcast server node to agents ...
    error= write message:
               github.com/coder/coder/v2/enterprise/wsproxy/wsproxysdk.(*remoteMultiAgentHandler).writeJSON
                   /home/coder/coder/enterprise/wsproxy/wsproxysdk/wsproxysdk.go:524
             - failed to write msg: WebSocket closed: failed to read frame header: EOF
```

After:
```
Coder Workspace Proxy v0.0.0-devel+12f1878 - Your Self-Hosted Remote Development Platform
Started HTTP listener at http://0.0.0.0:3001

View the Web UI: http://127.0.0.1:3001

==> Logs will stream in below (press ctrl+c to gracefully exit):
2024-01-04 20:26:38.545 [warn]  net.workspace-proxy.servertailnet: multiagent closed, reinitializing
2024-01-04 20:26:38.546 [erro]  net.workspace-proxy.servertailnet: reinit multi agent ...
    error= dial coordinate websocket:
               github.com/coder/coder/v2/enterprise/wsproxy/wsproxysdk.(*Client).DialCoordinator
                   /home/coder/coder/enterprise/wsproxy/wsproxysdk/wsproxysdk.go:454
             - failed to WebSocket dial: failed to send handshake request: Get "http://127.0.0.1:3000/api/v2/workspaceproxies/me/coordinate": dial tcp 127.0.0.1:3000: connect: connection refused
2024-01-04 20:26:38.587 [erro]  net.workspace-proxy.servertailnet: reinit multi agent ...
    error= dial coordinate websocket:
               github.com/coder/coder/v2/enterprise/wsproxy/wsproxysdk.(*Client).DialCoordinator
                   /home/coder/coder/enterprise/wsproxy/wsproxysdk/wsproxysdk.go:454
             - failed to WebSocket dial: failed to send handshake request: Get "http://127.0.0.1:3000/api/v2/workspaceproxies/me/coordinate": dial tcp 127.0.0.1:3000: connect: connection refusedhandshake request: Get "http://127.0.0.1:3000/api/v2/workspaceproxies/me/coordinate": dial tcp 127.0.0.1:3000: connect: connection refused
2024-01-04 20:26:40.446 [info]  net.workspace-proxy.servertailnet: successfully reinitialized multiagent  agents=0  took=1.900892615s
```
This commit is contained in:
Colin Adler 2024-01-11 11:37:09 -06:00 committed by GitHub
parent d708ac7c04
commit 4a0808259a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 10 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"time"
"cdr.dev/slog"
"nhooyr.io/websocket"
)
@ -26,10 +27,10 @@ func Heartbeat(ctx context.Context, conn *websocket.Conn) {
}
}
// Heartbeat loops to ping a WebSocket to keep it alive. It kills the connection
// on ping failure.
func HeartbeatClose(ctx context.Context, exit func(), conn *websocket.Conn) {
ticker := time.NewTicker(30 * time.Second)
// Heartbeat loops to ping a WebSocket to keep it alive. It calls `exit` on ping
// failure.
func HeartbeatClose(ctx context.Context, logger slog.Logger, exit func(), conn *websocket.Conn) {
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
for {
@ -41,6 +42,7 @@ func HeartbeatClose(ctx context.Context, exit func(), conn *websocket.Conn) {
err := conn.Ping(ctx)
if err != nil {
_ = conn.Close(websocket.StatusGoingAway, "Ping failed")
logger.Info(ctx, "failed to heartbeat ping", slog.Error(err))
exit()
return
}

View File

@ -224,6 +224,7 @@ func (s *ServerTailnet) watchAgentUpdates() {
nodes, ok := conn.NextUpdate(s.ctx)
if !ok {
if conn.IsClosed() && s.ctx.Err() == nil {
s.logger.Warn(s.ctx, "multiagent closed, reinitializing")
s.reinitCoordinator()
continue
}
@ -247,6 +248,7 @@ func (s *ServerTailnet) getAgentConn() tailnet.MultiAgentConn {
}
func (s *ServerTailnet) reinitCoordinator() {
start := time.Now()
for retrier := retry.New(25*time.Millisecond, 5*time.Second); retrier.Wait(s.ctx); {
s.nodesMu.Lock()
agentConn, err := s.getMultiAgent(s.ctx)
@ -264,6 +266,11 @@ func (s *ServerTailnet) reinitCoordinator() {
s.logger.Warn(s.ctx, "resubscribe to agent", slog.Error(err), slog.F("agent_id", agentID))
}
}
s.logger.Info(s.ctx, "successfully reinitialized multiagent",
slog.F("agents", len(s.agentConnectionTimes)),
slog.F("took", time.Since(start)),
)
s.nodesMu.Unlock()
return
}

View File

@ -431,6 +431,7 @@ type CoordinateNodes struct {
func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, error) {
ctx, cancel := context.WithCancel(ctx)
logger := c.SDKClient.Logger().Named("multiagent")
coordinateURL, err := c.SDKClient.URL.Parse("/api/v2/workspaceproxies/me/coordinate")
if err != nil {
@ -454,12 +455,13 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro
return nil, xerrors.Errorf("dial coordinate websocket: %w", err)
}
go httpapi.HeartbeatClose(ctx, cancel, conn)
go httpapi.HeartbeatClose(ctx, logger, cancel, conn)
nc := websocket.NetConn(ctx, conn, websocket.MessageText)
rma := remoteMultiAgentHandler{
sdk: c,
nc: nc,
cancel: cancel,
legacyAgentCache: map[uuid.UUID]bool{},
}
@ -472,6 +474,11 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro
OnRemove: func(agpl.Queue) { conn.Close(websocket.StatusGoingAway, "closed") },
}).Init()
go func() {
<-ctx.Done()
ma.Close()
}()
go func() {
defer cancel()
dec := json.NewDecoder(nc)
@ -480,16 +487,17 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro
err := dec.Decode(&msg)
if err != nil {
if xerrors.Is(err, io.EOF) {
logger.Info(ctx, "websocket connection severed", slog.Error(err))
return
}
c.SDKClient.Logger().Error(ctx, "failed to decode coordinator nodes", slog.Error(err))
logger.Error(ctx, "decode coordinator nodes", slog.Error(err))
return
}
err = ma.Enqueue(msg.Nodes)
if err != nil {
c.SDKClient.Logger().Error(ctx, "enqueue nodes from coordinator", slog.Error(err))
logger.Error(ctx, "enqueue nodes from coordinator", slog.Error(err))
continue
}
}
@ -499,8 +507,9 @@ func (c *Client) DialCoordinator(ctx context.Context) (agpl.MultiAgentConn, erro
}
type remoteMultiAgentHandler struct {
sdk *Client
nc net.Conn
sdk *Client
nc net.Conn
cancel func()
legacyMu sync.RWMutex
legacyAgentCache map[uuid.UUID]bool
@ -517,10 +526,12 @@ func (a *remoteMultiAgentHandler) writeJSON(v interface{}) error {
// Node updates are tiny, so even the dinkiest connection can handle them if it's not hung.
err = a.nc.SetWriteDeadline(time.Now().Add(agpl.WriteTimeout))
if err != nil {
a.cancel()
return xerrors.Errorf("set write deadline: %w", err)
}
_, err = a.nc.Write(data)
if err != nil {
a.cancel()
return xerrors.Errorf("write message: %w", err)
}
@ -531,6 +542,7 @@ func (a *remoteMultiAgentHandler) writeJSON(v interface{}) error {
// our successful write, it is important that we reset the deadline before it fires.
err = a.nc.SetWriteDeadline(time.Time{})
if err != nil {
a.cancel()
return xerrors.Errorf("clear write deadline: %w", err)
}
@ -573,7 +585,7 @@ func (a *remoteMultiAgentHandler) AgentIsLegacy(agentID uuid.UUID) bool {
return a.sdk.AgentIsLegacy(ctx, agentID)
})
if err != nil {
a.sdk.SDKClient.Logger().Error(ctx, "failed to check agent legacy status", slog.Error(err))
a.sdk.SDKClient.Logger().Error(ctx, "failed to check agent legacy status", slog.F("agent_id", agentID), slog.Error(err))
// Assume that the agent is legacy since this failed, while less
// efficient it will always work.