fix: fix TestPGCoordinatorDual_Mainline flake (#8228)

* fix TestPGCoordinatorDual_Mainline flake

Signed-off-by: Spike Curtis <spike@coder.com>

* use slices.Contains instead of local function

Signed-off-by: Spike Curtis <spike@coder.com>

---------

Signed-off-by: Spike Curtis <spike@coder.com>
This commit is contained in:
Spike Curtis 2023-06-28 11:37:45 +04:00 committed by GitHub
parent df95cf7ab2
commit c0a01ec81c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 47 additions and 59 deletions

View File

@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"golang.org/x/exp/slices"
"golang.org/x/xerrors"
"cdr.dev/slog"
@ -203,11 +204,9 @@ func TestPGCoordinatorSingle_MissedHeartbeats(t *testing.T) {
client := newTestClient(t, coordinator, agent.id)
defer client.close()
nodes := client.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 10)
assertEventuallyHasDERPs(ctx, t, client, 10)
client.sendNode(&agpl.Node{PreferredDERP: 11})
nodes = agent.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 11)
assertEventuallyHasDERPs(ctx, t, agent, 11)
// simulate a second coordinator via DB calls only --- our goal is to test broken heart-beating, so we can't use a
// real coordinator
@ -233,8 +232,7 @@ func TestPGCoordinatorSingle_MissedHeartbeats(t *testing.T) {
}()
fCoord2.heartbeat()
fCoord2.agentNode(agent.id, &agpl.Node{PreferredDERP: 12})
nodes = client.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 12)
assertEventuallyHasDERPs(ctx, t, client, 12)
fCoord3 := &fakeCoordinator{
ctx: ctx,
@ -245,24 +243,20 @@ func TestPGCoordinatorSingle_MissedHeartbeats(t *testing.T) {
start := time.Now()
fCoord3.heartbeat()
fCoord3.agentNode(agent.id, &agpl.Node{PreferredDERP: 13})
nodes = client.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 13)
assertEventuallyHasDERPs(ctx, t, client, 13)
// when the fCoord3 misses enough heartbeats, the real coordinator should send an update with the
// node from fCoord2 for the agent.
nodes = client.recvNodes(ctx, t)
assertEventuallyHasDERPs(ctx, t, client, 12)
assert.Greater(t, time.Since(start), tailnet.HeartbeatPeriod*tailnet.MissedHeartbeats)
assertHasDERPs(t, nodes, 12)
// stop fCoord2 heartbeats, which should cause us to revert to the original agent mapping
cancel2()
nodes = client.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 10)
assertEventuallyHasDERPs(ctx, t, client, 10)
// send fCoord3 heartbeat, which should trigger us to consider that mapping valid again.
fCoord3.heartbeat()
nodes = client.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 13)
assertEventuallyHasDERPs(ctx, t, client, 13)
err = agent.close()
require.NoError(t, err)
@ -358,33 +352,24 @@ func TestPGCoordinatorDual_Mainline(t *testing.T) {
defer client22.close()
client11.sendNode(&agpl.Node{PreferredDERP: 11})
nodes := agent1.recvNodes(ctx, t)
assert.Len(t, nodes, 1)
assertHasDERPs(t, nodes, 11)
assertEventuallyHasDERPs(ctx, t, agent1, 11)
client21.sendNode(&agpl.Node{PreferredDERP: 21})
nodes = agent1.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 21, 11)
assertEventuallyHasDERPs(ctx, t, agent1, 21, 11)
client22.sendNode(&agpl.Node{PreferredDERP: 22})
nodes = agent2.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 22)
assertEventuallyHasDERPs(ctx, t, agent2, 22)
agent2.sendNode(&agpl.Node{PreferredDERP: 2})
nodes = client22.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 2)
nodes = client12.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 2)
assertEventuallyHasDERPs(ctx, t, client22, 2)
assertEventuallyHasDERPs(ctx, t, client12, 2)
client12.sendNode(&agpl.Node{PreferredDERP: 12})
nodes = agent2.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 12, 22)
assertEventuallyHasDERPs(ctx, t, agent2, 12, 22)
agent1.sendNode(&agpl.Node{PreferredDERP: 1})
nodes = client21.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 1)
nodes = client11.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 1)
assertEventuallyHasDERPs(ctx, t, client21, 1)
assertEventuallyHasDERPs(ctx, t, client11, 1)
// let's close coord2
err = coord2.Close()
@ -402,8 +387,7 @@ func TestPGCoordinatorDual_Mainline(t *testing.T) {
// In this case the update is superfluous because client11's node hasn't changed, and agents don't deprogram clients
// from the dataplane even if they are missing. Suppressing this kind of update would require the coordinator to
// store all the data its sent to each connection, so we don't bother.
nodes = agent1.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 11)
assertEventuallyHasDERPs(ctx, t, agent1, 11)
// note that although agent2 is disconnected, client12 does NOT get an update because we suppress empty updates.
// (Its easy to tell these are superfluous.)
@ -492,19 +476,15 @@ func TestPGCoordinator_MultiAgent(t *testing.T) {
defer client.close()
client.sendNode(&agpl.Node{PreferredDERP: 3})
nodes := agent1.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 3)
nodes = agent2.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 3)
assertEventuallyHasDERPs(ctx, t, agent1, 3)
assertEventuallyHasDERPs(ctx, t, agent2, 3)
agent1.sendNode(&agpl.Node{PreferredDERP: 1})
nodes = client.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 1)
assertEventuallyHasDERPs(ctx, t, client, 1)
// agent2's update overrides agent1 because it is newer
agent2.sendNode(&agpl.Node{PreferredDERP: 2})
nodes = client.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 2)
assertEventuallyHasDERPs(ctx, t, client, 2)
// agent2 disconnects, and we should revert back to agent1
err = agent2.close()
@ -512,16 +492,13 @@ func TestPGCoordinator_MultiAgent(t *testing.T) {
err = agent2.recvErr(ctx, t)
require.ErrorIs(t, err, io.ErrClosedPipe)
agent2.waitForClose(ctx, t)
nodes = client.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 1)
assertEventuallyHasDERPs(ctx, t, client, 1)
agent1.sendNode(&agpl.Node{PreferredDERP: 11})
nodes = client.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 11)
assertEventuallyHasDERPs(ctx, t, client, 11)
client.sendNode(&agpl.Node{PreferredDERP: 31})
nodes = agent1.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 31)
assertEventuallyHasDERPs(ctx, t, agent1, 31)
err = agent1.close()
require.NoError(t, err)
@ -625,17 +602,27 @@ func newTestClient(t *testing.T, coord agpl.Coordinator, agentID uuid.UUID, id .
return c
}
func assertHasDERPs(t *testing.T, nodes []*agpl.Node, expected ...int) {
if !assert.Len(t, nodes, len(expected), "expected %d node(s), got %d", len(expected), len(nodes)) {
func assertEventuallyHasDERPs(ctx context.Context, t *testing.T, c *testConn, expected ...int) {
t.Helper()
for {
nodes := c.recvNodes(ctx, t)
if len(nodes) != len(expected) {
t.Logf("expected %d, got %d nodes", len(expected), len(nodes))
continue
}
derps := make([]int, 0, len(nodes))
for _, n := range nodes {
derps = append(derps, n.PreferredDERP)
}
for _, e := range expected {
if !slices.Contains(derps, e) {
t.Logf("expected DERP %d to be in %v", e, derps)
continue
}
}
return
}
derps := make([]int, 0, len(nodes))
for _, n := range nodes {
derps = append(derps, n.PreferredDERP)
}
for _, e := range expected {
assert.Contains(t, derps, e, "expected DERP %v, got %v", e, derps)
}
}
func assertEventuallyNoAgents(ctx context.Context, t *testing.T, store database.Store, agentID uuid.UUID) {

View File

@ -227,7 +227,7 @@ func (t *TrackedConn) SendUpdates() {
return
}
if bytes.Equal(t.lastData, data) {
t.logger.Debug(t.ctx, "skipping duplicate update", slog.F("nodes", nodes))
t.logger.Debug(t.ctx, "skipping duplicate update", slog.F("nodes", string(data)))
continue
}
@ -243,11 +243,12 @@ func (t *TrackedConn) SendUpdates() {
_, err = t.conn.Write(data)
if err != nil {
// often, this is just because the connection is closed/broken, so only log at debug.
t.logger.Debug(t.ctx, "could not write nodes to connection", slog.Error(err), slog.F("nodes", nodes))
t.logger.Debug(t.ctx, "could not write nodes to connection",
slog.Error(err), slog.F("nodes", string(data)))
_ = t.Close()
return
}
t.logger.Debug(t.ctx, "wrote nodes", slog.F("nodes", nodes))
t.logger.Debug(t.ctx, "wrote nodes", slog.F("nodes", string(data)))
// nhooyr.io/websocket has a bugged implementation of deadlines on a websocket net.Conn. What they are
// *supposed* to do is set a deadline for any subsequent writes to complete, otherwise the call to Write()