chore: add nodeUpdater to tailnet (#11539)

Adds a nodeUpdater component, which serves a similar role to configMaps, but tracks information from tailscale going out to the coordinator as node updates.  This first PR just handles netInfo, subsequent PRs will
handle DERP forced websockets, endpoints, and addresses.
This commit is contained in:
Spike Curtis 2024-01-11 09:29:42 +04:00 committed by GitHub
parent 7005fb1b2f
commit 8701dbc874
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 256 additions and 8 deletions

View File

@ -48,13 +48,17 @@ const (
closed
)
type configMaps struct {
type phased struct {
sync.Cond
phase phase
}
type configMaps struct {
phased
netmapDirty bool
derpMapDirty bool
filterDirty bool
closing bool
phase phase
engine engineConfigurable
static netmap.NetworkMap
@ -71,7 +75,7 @@ type configMaps struct {
func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg.NodeID, nodeKey key.NodePrivate, discoKey key.DiscoPublic, addresses []netip.Prefix) *configMaps {
pubKey := nodeKey.Public()
c := &configMaps{
Cond: *(sync.NewCond(&sync.Mutex{})),
phased: phased{Cond: *(sync.NewCond(&sync.Mutex{}))},
logger: logger,
engine: engine,
static: netmap.NetworkMap{

View File

@ -96,7 +96,7 @@ func TestConfigMaps_setAddresses_same(t *testing.T) {
uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public(), addrs)
defer uut.close()
requireNeverConfigures(ctx, t, uut)
requireNeverConfigures(ctx, t, &uut.phased)
uut.setAddresses(addrs)
@ -190,7 +190,7 @@ func TestConfigMaps_updatePeers_same(t *testing.T) {
defer uut.close()
// Then: we don't configure
requireNeverConfigures(ctx, t, uut)
requireNeverConfigures(ctx, t, &uut.phased)
p1ID := uuid.UUID{1}
p1Node := newTestNode(1)
@ -558,7 +558,7 @@ func TestConfigMaps_setBlockEndpoints_same(t *testing.T) {
uut.L.Unlock()
// Then: we don't configure
requireNeverConfigures(ctx, t, uut)
requireNeverConfigures(ctx, t, &uut.phased)
// When we set blockEndpoints to true
uut.setBlockEndpoints(true)
@ -619,7 +619,7 @@ func TestConfigMaps_updatePeers_nonexist(t *testing.T) {
defer uut.close()
// Then: we don't configure
requireNeverConfigures(ctx, t, uut)
requireNeverConfigures(ctx, t, &uut.phased)
// Given: no known peers
go func() {
@ -669,7 +669,7 @@ func getNodeWithID(t testing.TB, peers []*tailcfg.Node, id tailcfg.NodeID) *tail
return nil
}
func requireNeverConfigures(ctx context.Context, t *testing.T, uut *configMaps) {
func requireNeverConfigures(ctx context.Context, t *testing.T, uut *phased) {
t.Helper()
waiting := make(chan struct{})
go func() {

134
tailnet/node.go Normal file
View File

@ -0,0 +1,134 @@
package tailnet
import (
"context"
"net/netip"
"sync"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"tailscale.com/tailcfg"
"tailscale.com/types/key"
"cdr.dev/slog"
"github.com/coder/coder/v2/coderd/database/dbtime"
)
type nodeUpdater struct {
phased
dirty bool
closing bool
// static
logger slog.Logger
id tailcfg.NodeID
key key.NodePublic
discoKey key.DiscoPublic
callback func(n *Node)
// dynamic
preferredDERP int
derpLatency map[string]float64
derpForcedWebsockets map[int]string
endpoints []string
addresses []netip.Prefix
}
// updateLoop waits until the config is dirty and then calls the callback with the newest node.
// It is intended only to be called internally, and shuts down when close() is called.
func (u *nodeUpdater) updateLoop() {
u.L.Lock()
defer u.L.Unlock()
defer func() {
u.phase = closed
u.Broadcast()
}()
for {
for !(u.closing || u.dirty) {
u.phase = idle
u.Wait()
}
if u.closing {
return
}
node := u.nodeLocked()
u.dirty = false
u.phase = configuring
u.Broadcast()
// We cannot reach nodes without DERP for discovery. Therefore, there is no point in sending
// the node without this, and we can save ourselves from churn in the tailscale/wireguard
// layer.
if node.PreferredDERP == 0 {
u.logger.Debug(context.Background(), "skipped sending node; no PreferredDERP", slog.F("node", node))
continue
}
u.L.Unlock()
u.callback(node)
u.L.Lock()
}
}
// close closes the nodeUpdate and stops it calling the node callback
func (u *nodeUpdater) close() {
u.L.Lock()
defer u.L.Unlock()
u.closing = true
u.Broadcast()
for u.phase != closed {
u.Wait()
}
}
func newNodeUpdater(
logger slog.Logger, callback func(n *Node),
id tailcfg.NodeID, np key.NodePublic, dp key.DiscoPublic,
) *nodeUpdater {
u := &nodeUpdater{
phased: phased{Cond: *(sync.NewCond(&sync.Mutex{}))},
logger: logger,
id: id,
key: np,
discoKey: dp,
callback: callback,
}
go u.updateLoop()
return u
}
// nodeLocked returns the current best node information. u.L must be held.
func (u *nodeUpdater) nodeLocked() *Node {
return &Node{
ID: u.id,
AsOf: dbtime.Now(),
Key: u.key,
Addresses: slices.Clone(u.addresses),
AllowedIPs: slices.Clone(u.addresses),
DiscoKey: u.discoKey,
Endpoints: slices.Clone(u.endpoints),
PreferredDERP: u.preferredDERP,
DERPLatency: maps.Clone(u.derpLatency),
DERPForcedWebsocket: maps.Clone(u.derpForcedWebsockets),
}
}
// setNetInfo processes a NetInfo update from the wireguard engine. c.L MUST
// NOT be held.
func (u *nodeUpdater) setNetInfo(ni *tailcfg.NetInfo) {
u.L.Lock()
defer u.L.Unlock()
dirty := false
if u.preferredDERP != ni.PreferredDERP {
dirty = true
u.preferredDERP = ni.PreferredDERP
}
if !maps.Equal(u.derpLatency, ni.DERPLatency) {
dirty = true
u.derpLatency = ni.DERPLatency
}
if dirty {
u.dirty = true
u.Broadcast()
}
}

View File

@ -0,0 +1,110 @@
package tailnet
import (
"testing"
"github.com/stretchr/testify/require"
"golang.org/x/exp/maps"
"tailscale.com/tailcfg"
"tailscale.com/types/key"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/testutil"
)
func TestNodeUpdater_setNetInfo_different(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
id := tailcfg.NodeID(1)
nodeKey := key.NewNode().Public()
discoKey := key.NewDisco().Public()
nodeCh := make(chan *Node)
goCh := make(chan struct{})
uut := newNodeUpdater(
logger,
func(n *Node) {
nodeCh <- n
<-goCh
},
id, nodeKey, discoKey,
)
defer uut.close()
dl := map[string]float64{"1": 0.025}
uut.setNetInfo(&tailcfg.NetInfo{
PreferredDERP: 1,
DERPLatency: dl,
})
node := testutil.RequireRecvCtx(ctx, t, nodeCh)
require.Equal(t, nodeKey, node.Key)
require.Equal(t, discoKey, node.DiscoKey)
require.Equal(t, 1, node.PreferredDERP)
require.True(t, maps.Equal(dl, node.DERPLatency))
// Send in second update to test getting updates in the middle of the
// callback
uut.setNetInfo(&tailcfg.NetInfo{
PreferredDERP: 2,
DERPLatency: dl,
})
close(goCh) // allows callback to complete
node = testutil.RequireRecvCtx(ctx, t, nodeCh)
require.Equal(t, nodeKey, node.Key)
require.Equal(t, discoKey, node.DiscoKey)
require.Equal(t, 2, node.PreferredDERP)
require.True(t, maps.Equal(dl, node.DERPLatency))
done := make(chan struct{})
go func() {
defer close(done)
uut.close()
}()
_ = testutil.RequireRecvCtx(ctx, t, done)
}
func TestNodeUpdater_setNetInfo_same(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
id := tailcfg.NodeID(1)
nodeKey := key.NewNode().Public()
discoKey := key.NewDisco().Public()
nodeCh := make(chan *Node)
goCh := make(chan struct{})
uut := newNodeUpdater(
logger,
func(n *Node) {
nodeCh <- n
<-goCh
},
id, nodeKey, discoKey,
)
defer uut.close()
// Then: we don't configure
requireNeverConfigures(ctx, t, &uut.phased)
// Given: preferred DERP and latency already set
dl := map[string]float64{"1": 0.025}
uut.L.Lock()
uut.preferredDERP = 1
uut.derpLatency = maps.Clone(dl)
uut.L.Unlock()
// When: new update with same info
uut.setNetInfo(&tailcfg.NetInfo{
PreferredDERP: 1,
DERPLatency: dl,
})
done := make(chan struct{})
go func() {
defer close(done)
uut.close()
}()
_ = testutil.RequireRecvCtx(ctx, t, done)
}