diff --git a/tailnet/configmaps.go b/tailnet/configmaps.go index 2e5e019bf2..028ba7dfff 100644 --- a/tailnet/configmaps.go +++ b/tailnet/configmaps.go @@ -48,13 +48,17 @@ const ( closed ) -type configMaps struct { +type phased struct { sync.Cond + phase phase +} + +type configMaps struct { + phased netmapDirty bool derpMapDirty bool filterDirty bool closing bool - phase phase engine engineConfigurable static netmap.NetworkMap @@ -71,7 +75,7 @@ type configMaps struct { func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg.NodeID, nodeKey key.NodePrivate, discoKey key.DiscoPublic, addresses []netip.Prefix) *configMaps { pubKey := nodeKey.Public() c := &configMaps{ - Cond: *(sync.NewCond(&sync.Mutex{})), + phased: phased{Cond: *(sync.NewCond(&sync.Mutex{}))}, logger: logger, engine: engine, static: netmap.NetworkMap{ diff --git a/tailnet/configmaps_internal_test.go b/tailnet/configmaps_internal_test.go index 003ac1b522..334bc43017 100644 --- a/tailnet/configmaps_internal_test.go +++ b/tailnet/configmaps_internal_test.go @@ -96,7 +96,7 @@ func TestConfigMaps_setAddresses_same(t *testing.T) { uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public(), addrs) defer uut.close() - requireNeverConfigures(ctx, t, uut) + requireNeverConfigures(ctx, t, &uut.phased) uut.setAddresses(addrs) @@ -190,7 +190,7 @@ func TestConfigMaps_updatePeers_same(t *testing.T) { defer uut.close() // Then: we don't configure - requireNeverConfigures(ctx, t, uut) + requireNeverConfigures(ctx, t, &uut.phased) p1ID := uuid.UUID{1} p1Node := newTestNode(1) @@ -558,7 +558,7 @@ func TestConfigMaps_setBlockEndpoints_same(t *testing.T) { uut.L.Unlock() // Then: we don't configure - requireNeverConfigures(ctx, t, uut) + requireNeverConfigures(ctx, t, &uut.phased) // When we set blockEndpoints to true uut.setBlockEndpoints(true) @@ -619,7 +619,7 @@ func TestConfigMaps_updatePeers_nonexist(t *testing.T) { defer uut.close() // Then: we don't configure - requireNeverConfigures(ctx, t, uut) + requireNeverConfigures(ctx, t, &uut.phased) // Given: no known peers go func() { @@ -669,7 +669,7 @@ func getNodeWithID(t testing.TB, peers []*tailcfg.Node, id tailcfg.NodeID) *tail return nil } -func requireNeverConfigures(ctx context.Context, t *testing.T, uut *configMaps) { +func requireNeverConfigures(ctx context.Context, t *testing.T, uut *phased) { t.Helper() waiting := make(chan struct{}) go func() { diff --git a/tailnet/node.go b/tailnet/node.go new file mode 100644 index 0000000000..a9912154d6 --- /dev/null +++ b/tailnet/node.go @@ -0,0 +1,134 @@ +package tailnet + +import ( + "context" + "net/netip" + "sync" + + "golang.org/x/exp/maps" + "golang.org/x/exp/slices" + "tailscale.com/tailcfg" + "tailscale.com/types/key" + + "cdr.dev/slog" + "github.com/coder/coder/v2/coderd/database/dbtime" +) + +type nodeUpdater struct { + phased + dirty bool + closing bool + + // static + logger slog.Logger + id tailcfg.NodeID + key key.NodePublic + discoKey key.DiscoPublic + callback func(n *Node) + + // dynamic + preferredDERP int + derpLatency map[string]float64 + derpForcedWebsockets map[int]string + endpoints []string + addresses []netip.Prefix +} + +// updateLoop waits until the config is dirty and then calls the callback with the newest node. +// It is intended only to be called internally, and shuts down when close() is called. +func (u *nodeUpdater) updateLoop() { + u.L.Lock() + defer u.L.Unlock() + defer func() { + u.phase = closed + u.Broadcast() + }() + for { + for !(u.closing || u.dirty) { + u.phase = idle + u.Wait() + } + if u.closing { + return + } + node := u.nodeLocked() + u.dirty = false + u.phase = configuring + u.Broadcast() + + // We cannot reach nodes without DERP for discovery. Therefore, there is no point in sending + // the node without this, and we can save ourselves from churn in the tailscale/wireguard + // layer. + if node.PreferredDERP == 0 { + u.logger.Debug(context.Background(), "skipped sending node; no PreferredDERP", slog.F("node", node)) + continue + } + + u.L.Unlock() + u.callback(node) + u.L.Lock() + } +} + +// close closes the nodeUpdate and stops it calling the node callback +func (u *nodeUpdater) close() { + u.L.Lock() + defer u.L.Unlock() + u.closing = true + u.Broadcast() + for u.phase != closed { + u.Wait() + } +} + +func newNodeUpdater( + logger slog.Logger, callback func(n *Node), + id tailcfg.NodeID, np key.NodePublic, dp key.DiscoPublic, +) *nodeUpdater { + u := &nodeUpdater{ + phased: phased{Cond: *(sync.NewCond(&sync.Mutex{}))}, + logger: logger, + id: id, + key: np, + discoKey: dp, + callback: callback, + } + go u.updateLoop() + return u +} + +// nodeLocked returns the current best node information. u.L must be held. +func (u *nodeUpdater) nodeLocked() *Node { + return &Node{ + ID: u.id, + AsOf: dbtime.Now(), + Key: u.key, + Addresses: slices.Clone(u.addresses), + AllowedIPs: slices.Clone(u.addresses), + DiscoKey: u.discoKey, + Endpoints: slices.Clone(u.endpoints), + PreferredDERP: u.preferredDERP, + DERPLatency: maps.Clone(u.derpLatency), + DERPForcedWebsocket: maps.Clone(u.derpForcedWebsockets), + } +} + +// setNetInfo processes a NetInfo update from the wireguard engine. c.L MUST +// NOT be held. +func (u *nodeUpdater) setNetInfo(ni *tailcfg.NetInfo) { + u.L.Lock() + defer u.L.Unlock() + dirty := false + if u.preferredDERP != ni.PreferredDERP { + dirty = true + u.preferredDERP = ni.PreferredDERP + } + if !maps.Equal(u.derpLatency, ni.DERPLatency) { + dirty = true + u.derpLatency = ni.DERPLatency + } + if dirty { + u.dirty = true + u.Broadcast() + } +} diff --git a/tailnet/node_internal_test.go b/tailnet/node_internal_test.go new file mode 100644 index 0000000000..27dc5609d1 --- /dev/null +++ b/tailnet/node_internal_test.go @@ -0,0 +1,110 @@ +package tailnet + +import ( + "testing" + + "github.com/stretchr/testify/require" + "golang.org/x/exp/maps" + "tailscale.com/tailcfg" + "tailscale.com/types/key" + + "cdr.dev/slog" + "cdr.dev/slog/sloggers/slogtest" + "github.com/coder/coder/v2/testutil" +) + +func TestNodeUpdater_setNetInfo_different(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + id := tailcfg.NodeID(1) + nodeKey := key.NewNode().Public() + discoKey := key.NewDisco().Public() + nodeCh := make(chan *Node) + goCh := make(chan struct{}) + uut := newNodeUpdater( + logger, + func(n *Node) { + nodeCh <- n + <-goCh + }, + id, nodeKey, discoKey, + ) + defer uut.close() + + dl := map[string]float64{"1": 0.025} + uut.setNetInfo(&tailcfg.NetInfo{ + PreferredDERP: 1, + DERPLatency: dl, + }) + + node := testutil.RequireRecvCtx(ctx, t, nodeCh) + require.Equal(t, nodeKey, node.Key) + require.Equal(t, discoKey, node.DiscoKey) + require.Equal(t, 1, node.PreferredDERP) + require.True(t, maps.Equal(dl, node.DERPLatency)) + + // Send in second update to test getting updates in the middle of the + // callback + uut.setNetInfo(&tailcfg.NetInfo{ + PreferredDERP: 2, + DERPLatency: dl, + }) + close(goCh) // allows callback to complete + + node = testutil.RequireRecvCtx(ctx, t, nodeCh) + require.Equal(t, nodeKey, node.Key) + require.Equal(t, discoKey, node.DiscoKey) + require.Equal(t, 2, node.PreferredDERP) + require.True(t, maps.Equal(dl, node.DERPLatency)) + + done := make(chan struct{}) + go func() { + defer close(done) + uut.close() + }() + _ = testutil.RequireRecvCtx(ctx, t, done) +} + +func TestNodeUpdater_setNetInfo_same(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + id := tailcfg.NodeID(1) + nodeKey := key.NewNode().Public() + discoKey := key.NewDisco().Public() + nodeCh := make(chan *Node) + goCh := make(chan struct{}) + uut := newNodeUpdater( + logger, + func(n *Node) { + nodeCh <- n + <-goCh + }, + id, nodeKey, discoKey, + ) + defer uut.close() + + // Then: we don't configure + requireNeverConfigures(ctx, t, &uut.phased) + + // Given: preferred DERP and latency already set + dl := map[string]float64{"1": 0.025} + uut.L.Lock() + uut.preferredDERP = 1 + uut.derpLatency = maps.Clone(dl) + uut.L.Unlock() + + // When: new update with same info + uut.setNetInfo(&tailcfg.NetInfo{ + PreferredDERP: 1, + DERPLatency: dl, + }) + + done := make(chan struct{}) + go func() { + defer close(done) + uut.close() + }() + _ = testutil.RequireRecvCtx(ctx, t, done) +}