From dc3d39baf83df4345405611b7b01458cd47b6033 Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Fri, 5 May 2023 20:29:03 +0400 Subject: [PATCH] fix: agent disconnects from coordinator (#7430) * work around websocket deadline bug Signed-off-by: Spike Curtis * Use test context to hold websocket open Signed-off-by: Spike Curtis * Fix race creating test websocket Signed-off-by: Spike Curtis * set write deadline to time.Time zero Signed-off-by: Spike Curtis --------- Signed-off-by: Spike Curtis --- tailnet/coordinator.go | 19 ++++++++++++- tailnet/coordinator_test.go | 55 +++++++++++++++++++++++++++++++++---- 2 files changed, 67 insertions(+), 7 deletions(-) diff --git a/tailnet/coordinator.go b/tailnet/coordinator.go index 2f11566ded..5ee49cd194 100644 --- a/tailnet/coordinator.go +++ b/tailnet/coordinator.go @@ -206,6 +206,10 @@ func (t *TrackedConn) Close() error { return t.conn.Close() } +// WriteTimeout is the amount of time we wait to write a node update to a connection before we declare it hung. +// It is exported so that tests can use it. +const WriteTimeout = time.Second * 5 + // SendUpdates reads node updates and writes them to the connection. Ends when writes hit an error or context is // canceled. func (t *TrackedConn) SendUpdates() { @@ -223,7 +227,7 @@ func (t *TrackedConn) SendUpdates() { // Set a deadline so that hung connections don't put back pressure on the system. // Node updates are tiny, so even the dinkiest connection can handle them if it's not hung. - err = t.conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + err = t.conn.SetWriteDeadline(time.Now().Add(WriteTimeout)) if err != nil { // often, this is just because the connection is closed/broken, so only log at debug. t.logger.Debug(t.ctx, "unable to set write deadline", slog.Error(err)) @@ -238,6 +242,19 @@ func (t *TrackedConn) SendUpdates() { return } t.logger.Debug(t.ctx, "wrote nodes", slog.F("nodes", nodes)) + + // nhooyr.io/websocket has a bugged implementation of deadlines on a websocket net.Conn. What they are + // *supposed* to do is set a deadline for any subsequent writes to complete, otherwise the call to Write() + // fails. What nhooyr.io/websocket does is set a timer, after which it expires the websocket write context. + // If this timer fires, then the next write will fail *even if we set a new write deadline*. So, after + // our successful write, it is important that we reset the deadline before it fires. + err = t.conn.SetWriteDeadline(time.Time{}) + if err != nil { + // often, this is just because the connection is closed/broken, so only log at debug. + t.logger.Debug(t.ctx, "unable to extend write deadline", slog.Error(err)) + _ = t.Close() + return + } } } } diff --git a/tailnet/coordinator_test.go b/tailnet/coordinator_test.go index 407f5bb2cf..94c6f6da58 100644 --- a/tailnet/coordinator_test.go +++ b/tailnet/coordinator_test.go @@ -1,11 +1,16 @@ package tailnet_test import ( + "context" "encoding/json" "net" + "net/http" + "net/http/httptest" "testing" "time" + "nhooyr.io/websocket" + "cdr.dev/slog" "cdr.dev/slog/sloggers/slogtest" @@ -74,7 +79,10 @@ func TestCoordinator(t *testing.T) { logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) coordinator := tailnet.NewCoordinator(logger) - agentWS, agentServerWS := net.Pipe() + // in this test we use real websockets to test use of deadlines + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitSuperLong) + defer cancel() + agentWS, agentServerWS := websocketConn(ctx, t) defer agentWS.Close() agentNodeChan := make(chan []*tailnet.Node) sendAgentNode, agentErrChan := tailnet.ServeCoordinator(agentWS, func(nodes []*tailnet.Node) error { @@ -93,7 +101,7 @@ func TestCoordinator(t *testing.T) { return coordinator.Node(agentID) != nil }, testutil.WaitShort, testutil.IntervalFast) - clientWS, clientServerWS := net.Pipe() + clientWS, clientServerWS := websocketConn(ctx, t) defer clientWS.Close() defer clientServerWS.Close() clientNodeChan := make(chan []*tailnet.Node) @@ -108,16 +116,28 @@ func TestCoordinator(t *testing.T) { assert.NoError(t, err) close(closeClientChan) }() - agentNodes := <-clientNodeChan - require.Len(t, agentNodes, 1) + select { + case agentNodes := <-clientNodeChan: + require.Len(t, agentNodes, 1) + case <-ctx.Done(): + t.Fatal("timed out") + } sendClientNode(&tailnet.Node{}) clientNodes := <-agentNodeChan require.Len(t, clientNodes, 1) + // wait longer than the internal wait timeout. + // this tests for regression of https://github.com/coder/coder/issues/7428 + time.Sleep(tailnet.WriteTimeout * 3 / 2) + // Ensure an update to the agent node reaches the client! sendAgentNode(&tailnet.Node{}) - agentNodes = <-clientNodeChan - require.Len(t, agentNodes, 1) + select { + case agentNodes := <-clientNodeChan: + require.Len(t, agentNodes, 1) + case <-ctx.Done(): + t.Fatal("timed out") + } // Close the agent WebSocket so a new one can connect. err := agentWS.Close() @@ -334,3 +354,26 @@ func TestCoordinator_AgentUpdateWhileClientConnects(t *testing.T) { require.Len(t, cNodes, 1) require.Equal(t, 1, cNodes[0].PreferredDERP) } + +func websocketConn(ctx context.Context, t *testing.T) (client net.Conn, server net.Conn) { + t.Helper() + sc := make(chan net.Conn, 1) + s := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + wss, err := websocket.Accept(rw, r, nil) + require.NoError(t, err) + conn := websocket.NetConn(r.Context(), wss, websocket.MessageBinary) + sc <- conn + close(sc) // there can be only one + + // hold open until context canceled + <-ctx.Done() + })) + t.Cleanup(s.Close) + // nolint: bodyclose + wsc, _, err := websocket.Dial(ctx, s.URL, nil) + require.NoError(t, err) + client = websocket.NetConn(ctx, wsc, websocket.MessageBinary) + server, ok := <-sc + require.True(t, ok) + return client, server +}