mirror of https://github.com/coder/coder.git
fix: stop sending DeleteTailnetPeer when coordinator is unhealthy (#12925)
fixes #12923 Prevents Coordinate peer connections from generating spurious database queries like DeleteTailnetPeer when the coordinator is unhealthy. It does this by checking the health of the querier before accepting a connection, rather than unconditionally accepting it only for it to get swatted down later.
This commit is contained in:
parent
a607d5610e
commit
06eae954c9
|
@ -231,6 +231,17 @@ func (c *pgCoord) Coordinate(
|
|||
logger := c.logger.With(slog.F("peer_id", id))
|
||||
reqs := make(chan *proto.CoordinateRequest, agpl.RequestBufferSize)
|
||||
resps := make(chan *proto.CoordinateResponse, agpl.ResponseBufferSize)
|
||||
if !c.querier.isHealthy() {
|
||||
// If the coordinator is unhealthy, we don't want to hook this Coordinate call up to the
|
||||
// binder, as that can cause an unnecessary call to DeleteTailnetPeer when the connIO is
|
||||
// closed. Instead, we just close the response channel and bail out.
|
||||
// c.f. https://github.com/coder/coder/issues/12923
|
||||
c.logger.Info(ctx, "closed incoming coordinate call while unhealthy",
|
||||
slog.F("peer_id", id),
|
||||
)
|
||||
close(resps)
|
||||
return reqs, resps
|
||||
}
|
||||
cIO := newConnIO(c.ctx, ctx, logger, c.bindings, c.tunnelerCh, reqs, resps, id, name, a)
|
||||
err := agpl.SendCtx(c.ctx, c.newConnections, cIO)
|
||||
if err != nil {
|
||||
|
@ -842,7 +853,12 @@ func (q *querier) newConn(c *connIO) {
|
|||
defer q.mu.Unlock()
|
||||
if !q.healthy {
|
||||
err := c.Close()
|
||||
q.logger.Info(q.ctx, "closed incoming connection while unhealthy",
|
||||
// This can only happen during a narrow window where we were healthy
|
||||
// when pgCoord checked before accepting the connection, but now are
|
||||
// unhealthy now that we get around to processing it. Seeing a small
|
||||
// number of these logs is not worrying, but a large number probably
|
||||
// indicates something is amiss.
|
||||
q.logger.Warn(q.ctx, "closed incoming connection while unhealthy",
|
||||
slog.Error(err),
|
||||
slog.F("peer_id", c.UniqueID()),
|
||||
)
|
||||
|
@ -865,6 +881,12 @@ func (q *querier) newConn(c *connIO) {
|
|||
})
|
||||
}
|
||||
|
||||
func (q *querier) isHealthy() bool {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
return q.healthy
|
||||
}
|
||||
|
||||
func (q *querier) cleanupConn(c *connIO) {
|
||||
logger := q.logger.With(slog.F("peer_id", c.UniqueID()))
|
||||
q.mu.Lock()
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/mock/gomock"
|
||||
"golang.org/x/xerrors"
|
||||
gProto "google.golang.org/protobuf/proto"
|
||||
|
||||
"cdr.dev/slog"
|
||||
|
@ -21,6 +22,8 @@ import (
|
|||
"github.com/coder/coder/v2/coderd/database"
|
||||
"github.com/coder/coder/v2/coderd/database/dbmock"
|
||||
"github.com/coder/coder/v2/coderd/database/dbtestutil"
|
||||
"github.com/coder/coder/v2/coderd/database/pubsub"
|
||||
agpl "github.com/coder/coder/v2/tailnet"
|
||||
"github.com/coder/coder/v2/tailnet/proto"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
)
|
||||
|
@ -291,3 +294,51 @@ func TestGetDebug(t *testing.T) {
|
|||
require.Equal(t, peerID, debug.Tunnels[0].SrcID)
|
||||
require.Equal(t, dstID, debug.Tunnels[0].DstID)
|
||||
}
|
||||
|
||||
// TestPGCoordinatorUnhealthy tests that when the coordinator fails to send heartbeats and is
|
||||
// unhealthy it disconnects any peers and does not send any extraneous database queries.
|
||||
func TestPGCoordinatorUnhealthy(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
mStore := dbmock.NewMockStore(ctrl)
|
||||
ps := pubsub.NewInMemory()
|
||||
|
||||
// after 3 failed heartbeats, the coordinator is unhealthy
|
||||
mStore.EXPECT().
|
||||
UpsertTailnetCoordinator(gomock.Any(), gomock.Any()).
|
||||
MinTimes(3).
|
||||
Return(database.TailnetCoordinator{}, xerrors.New("badness"))
|
||||
mStore.EXPECT().
|
||||
DeleteCoordinator(gomock.Any(), gomock.Any()).
|
||||
Times(1).
|
||||
Return(nil)
|
||||
// But, in particular we DO NOT want the coordinator to call DeleteTailnetPeer, as this is
|
||||
// unnecessary and can spam the database. c.f. https://github.com/coder/coder/issues/12923
|
||||
|
||||
// these cleanup queries run, but we don't care for this test
|
||||
mStore.EXPECT().CleanTailnetCoordinators(gomock.Any()).AnyTimes().Return(nil)
|
||||
mStore.EXPECT().CleanTailnetLostPeers(gomock.Any()).AnyTimes().Return(nil)
|
||||
mStore.EXPECT().CleanTailnetTunnels(gomock.Any()).AnyTimes().Return(nil)
|
||||
|
||||
coordinator, err := newPGCoordInternal(ctx, logger, ps, mStore)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return !coordinator.querier.isHealthy()
|
||||
}, testutil.WaitShort, testutil.IntervalFast)
|
||||
|
||||
pID := uuid.UUID{5}
|
||||
_, resps := coordinator.Coordinate(ctx, pID, "test", agpl.AgentCoordinateeAuth{ID: pID})
|
||||
resp := testutil.RequireRecvCtx(ctx, t, resps)
|
||||
require.Nil(t, resp, "channel should be closed")
|
||||
|
||||
// give the coordinator some time to process any pending work. We are
|
||||
// testing here that a database call is absent, so we don't want to race to
|
||||
// shut down the test.
|
||||
time.Sleep(testutil.IntervalMedium)
|
||||
_ = coordinator.Close()
|
||||
require.Eventually(t, ctrl.Satisfied, testutil.WaitShort, testutil.IntervalFast)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue