coder/tailnet/node.go

250 lines
6.3 KiB
Go

package tailnet
import (
"context"
"net/netip"
"sync"
"time"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"tailscale.com/tailcfg"
"tailscale.com/types/key"
"tailscale.com/wgengine"
"cdr.dev/slog"
"github.com/coder/coder/v2/coderd/database/dbtime"
)
type nodeUpdater struct {
phased
dirty bool
closing bool
// static
logger slog.Logger
id tailcfg.NodeID
key key.NodePublic
discoKey key.DiscoPublic
callback func(n *Node)
// dynamic
preferredDERP int
derpLatency map[string]float64
derpForcedWebsockets map[int]string
endpoints []string
addresses []netip.Prefix
lastStatus time.Time
blockEndpoints bool
sentNode bool // for PeerDiagnostics
}
// updateLoop waits until the config is dirty and then calls the callback with the newest node.
// It is intended only to be called internally, and shuts down when close() is called.
func (u *nodeUpdater) updateLoop() {
u.L.Lock()
defer u.L.Unlock()
defer func() {
u.phase = closed
u.Broadcast()
}()
for {
for !(u.closing || u.dirty) {
u.phase = idle
u.Wait()
}
if u.closing {
u.logger.Debug(context.Background(), "closing nodeUpdater updateLoop")
return
}
u.dirty = false
u.phase = configuring
u.Broadcast()
callback := u.callback
if callback == nil {
u.logger.Debug(context.Background(), "skipped sending node; no node callback")
continue
}
// We cannot reach nodes without DERP for discovery. Therefore, there is no point in sending
// the node without this, and we can save ourselves from churn in the tailscale/wireguard
// layer.
node := u.nodeLocked()
if node.PreferredDERP == 0 {
u.logger.Debug(context.Background(), "skipped sending node; no PreferredDERP", slog.F("node", node))
continue
}
u.L.Unlock()
u.logger.Debug(context.Background(), "calling nodeUpdater callback", slog.F("node", node))
callback(node)
u.L.Lock()
u.sentNode = true
}
}
// close closes the nodeUpdate and stops it calling the node callback
func (u *nodeUpdater) close() {
u.L.Lock()
defer u.L.Unlock()
u.closing = true
u.Broadcast()
for u.phase != closed {
u.Wait()
}
}
func newNodeUpdater(
logger slog.Logger, callback func(n *Node),
id tailcfg.NodeID, np key.NodePublic, dp key.DiscoPublic,
) *nodeUpdater {
u := &nodeUpdater{
phased: phased{Cond: *(sync.NewCond(&sync.Mutex{}))},
logger: logger,
id: id,
key: np,
discoKey: dp,
derpForcedWebsockets: make(map[int]string),
callback: callback,
}
go u.updateLoop()
return u
}
// nodeLocked returns the current best node information. u.L must be held.
func (u *nodeUpdater) nodeLocked() *Node {
var endpoints []string
if !u.blockEndpoints {
endpoints = slices.Clone(u.endpoints)
}
return &Node{
ID: u.id,
AsOf: dbtime.Now(),
Key: u.key,
Addresses: slices.Clone(u.addresses),
AllowedIPs: slices.Clone(u.addresses),
DiscoKey: u.discoKey,
Endpoints: endpoints,
PreferredDERP: u.preferredDERP,
DERPLatency: maps.Clone(u.derpLatency),
DERPForcedWebsocket: maps.Clone(u.derpForcedWebsockets),
}
}
// setNetInfo processes a NetInfo update from the wireguard engine. c.L MUST
// NOT be held.
func (u *nodeUpdater) setNetInfo(ni *tailcfg.NetInfo) {
u.L.Lock()
defer u.L.Unlock()
dirty := false
if u.preferredDERP != ni.PreferredDERP {
dirty = true
u.preferredDERP = ni.PreferredDERP
u.logger.Debug(context.Background(), "new preferred DERP",
slog.F("preferred_derp", u.preferredDERP))
}
if !maps.Equal(u.derpLatency, ni.DERPLatency) {
dirty = true
u.derpLatency = ni.DERPLatency
}
if dirty {
u.dirty = true
u.Broadcast()
}
}
// setDERPForcedWebsocket handles callbacks from the magicConn about DERP regions that are forced to
// use websockets (instead of Upgrade: derp). This information is for debugging only.
func (u *nodeUpdater) setDERPForcedWebsocket(region int, reason string) {
u.L.Lock()
defer u.L.Unlock()
dirty := u.derpForcedWebsockets[region] != reason
u.derpForcedWebsockets[region] = reason
if dirty {
u.dirty = true
u.Broadcast()
}
}
// setStatus handles the status callback from the wireguard engine to learn about new endpoints
// (e.g. discovered by STUN). u.L MUST NOT be held
func (u *nodeUpdater) setStatus(s *wgengine.Status, err error) {
u.logger.Debug(context.Background(), "wireguard status", slog.F("status", s), slog.Error(err))
if err != nil {
return
}
u.L.Lock()
defer u.L.Unlock()
if s.AsOf.Before(u.lastStatus) {
// Don't process outdated status!
return
}
u.lastStatus = s.AsOf
endpoints := make([]string, len(s.LocalAddrs))
for i, ep := range s.LocalAddrs {
endpoints[i] = ep.Addr.String()
}
if slices.Equal(endpoints, u.endpoints) {
// No need to update the node if nothing changed!
return
}
u.endpoints = endpoints
u.dirty = true
u.Broadcast()
}
// setAddresses sets the local addresses for the node. u.L MUST NOT be held.
func (u *nodeUpdater) setAddresses(ips []netip.Prefix) {
u.L.Lock()
defer u.L.Unlock()
if d := prefixesDifferent(u.addresses, ips); !d {
return
}
u.addresses = make([]netip.Prefix, len(ips))
copy(u.addresses, ips)
u.dirty = true
u.Broadcast()
}
// setCallback sets the callback for node changes. It also triggers a call
// for the current node immediately. u.L MUST NOT be held.
func (u *nodeUpdater) setCallback(callback func(node *Node)) {
u.L.Lock()
defer u.L.Unlock()
u.callback = callback
u.dirty = true
u.sentNode = false
u.Broadcast()
}
// setBlockEndpoints sets whether we block reporting Node endpoints. u.L MUST NOT
// be held.
// nolint: revive
func (u *nodeUpdater) setBlockEndpoints(blockEndpoints bool) {
u.L.Lock()
defer u.L.Unlock()
if u.blockEndpoints == blockEndpoints {
return
}
u.dirty = true
u.blockEndpoints = blockEndpoints
u.Broadcast()
}
// fillPeerDiagnostics fills out the PeerDiagnostics with PreferredDERP and SentNode
func (u *nodeUpdater) fillPeerDiagnostics(d *PeerDiagnostics) {
u.L.Lock()
defer u.L.Unlock()
d.PreferredDERP = u.preferredDERP
d.SentNode = u.sentNode
}
// getBlockEndpoints returns the value of the most recent setBlockEndpoints
// call.
func (u *nodeUpdater) getBlockEndpoints() bool {
u.L.Lock()
defer u.L.Unlock()
return u.blockEndpoints
}