coder/tailnet/node_internal_test.go

658 lines
16 KiB
Go

package tailnet
import (
"net/netip"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"golang.org/x/xerrors"
"tailscale.com/tailcfg"
"tailscale.com/types/key"
"tailscale.com/wgengine"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/testutil"
)
func TestNodeUpdater_setNetInfo_different(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
id := tailcfg.NodeID(1)
nodeKey := key.NewNode().Public()
discoKey := key.NewDisco().Public()
nodeCh := make(chan *Node)
goCh := make(chan struct{})
uut := newNodeUpdater(
logger,
func(n *Node) {
nodeCh <- n
<-goCh
},
id, nodeKey, discoKey,
)
defer uut.close()
dl := map[string]float64{"1": 0.025}
uut.setNetInfo(&tailcfg.NetInfo{
PreferredDERP: 1,
DERPLatency: dl,
})
node := testutil.RequireRecvCtx(ctx, t, nodeCh)
require.Equal(t, nodeKey, node.Key)
require.Equal(t, discoKey, node.DiscoKey)
require.Equal(t, 1, node.PreferredDERP)
require.True(t, maps.Equal(dl, node.DERPLatency))
// Send in second update to test getting updates in the middle of the
// callback
uut.setNetInfo(&tailcfg.NetInfo{
PreferredDERP: 2,
DERPLatency: dl,
})
close(goCh) // allows callback to complete
node = testutil.RequireRecvCtx(ctx, t, nodeCh)
require.Equal(t, nodeKey, node.Key)
require.Equal(t, discoKey, node.DiscoKey)
require.Equal(t, 2, node.PreferredDERP)
require.True(t, maps.Equal(dl, node.DERPLatency))
done := make(chan struct{})
go func() {
defer close(done)
uut.close()
}()
_ = testutil.RequireRecvCtx(ctx, t, done)
}
func TestNodeUpdater_setNetInfo_same(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
id := tailcfg.NodeID(1)
nodeKey := key.NewNode().Public()
discoKey := key.NewDisco().Public()
nodeCh := make(chan *Node)
uut := newNodeUpdater(
logger,
func(n *Node) {
nodeCh <- n
},
id, nodeKey, discoKey,
)
defer uut.close()
// Then: we don't configure
requireNeverConfigures(ctx, t, &uut.phased)
// Given: preferred DERP and latency already set
dl := map[string]float64{"1": 0.025}
uut.L.Lock()
uut.preferredDERP = 1
uut.derpLatency = maps.Clone(dl)
uut.L.Unlock()
// When: new update with same info
uut.setNetInfo(&tailcfg.NetInfo{
PreferredDERP: 1,
DERPLatency: dl,
})
done := make(chan struct{})
go func() {
defer close(done)
uut.close()
}()
_ = testutil.RequireRecvCtx(ctx, t, done)
}
func TestNodeUpdater_setDERPForcedWebsocket_different(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
id := tailcfg.NodeID(1)
nodeKey := key.NewNode().Public()
discoKey := key.NewDisco().Public()
nodeCh := make(chan *Node)
uut := newNodeUpdater(
logger,
func(n *Node) {
nodeCh <- n
},
id, nodeKey, discoKey,
)
defer uut.close()
// Given: preferred DERP is 1, so we'll send an update
uut.L.Lock()
uut.preferredDERP = 1
uut.L.Unlock()
// When: we set a new forced websocket reason
uut.setDERPForcedWebsocket(1, "test")
// Then: we receive an update with the reason set
node := testutil.RequireRecvCtx(ctx, t, nodeCh)
require.Equal(t, nodeKey, node.Key)
require.Equal(t, discoKey, node.DiscoKey)
require.True(t, maps.Equal(map[int]string{1: "test"}, node.DERPForcedWebsocket))
done := make(chan struct{})
go func() {
defer close(done)
uut.close()
}()
_ = testutil.RequireRecvCtx(ctx, t, done)
}
func TestNodeUpdater_setDERPForcedWebsocket_same(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
id := tailcfg.NodeID(1)
nodeKey := key.NewNode().Public()
discoKey := key.NewDisco().Public()
nodeCh := make(chan *Node)
uut := newNodeUpdater(
logger,
func(n *Node) {
nodeCh <- n
},
id, nodeKey, discoKey,
)
defer uut.close()
// Then: we don't configure
requireNeverConfigures(ctx, t, &uut.phased)
// Given: preferred DERP is 1, so we would send an update on change &&
// reason for region 1 is set to "test"
uut.L.Lock()
uut.preferredDERP = 1
uut.derpForcedWebsockets[1] = "test"
uut.L.Unlock()
// When: we set region 1 to "test
uut.setDERPForcedWebsocket(1, "test")
done := make(chan struct{})
go func() {
defer close(done)
uut.close()
}()
_ = testutil.RequireRecvCtx(ctx, t, done)
}
func TestNodeUpdater_setStatus_different(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
id := tailcfg.NodeID(1)
nodeKey := key.NewNode().Public()
discoKey := key.NewDisco().Public()
nodeCh := make(chan *Node)
uut := newNodeUpdater(
logger,
func(n *Node) {
nodeCh <- n
},
id, nodeKey, discoKey,
)
defer uut.close()
// Given: preferred DERP is 1, so we'll send an update
uut.L.Lock()
uut.preferredDERP = 1
uut.L.Unlock()
// When: we set a new status
asof := time.Date(2024, 1, 10, 8, 0o0, 1, 1, time.UTC)
uut.setStatus(&wgengine.Status{
LocalAddrs: []tailcfg.Endpoint{
{Addr: netip.MustParseAddrPort("[fe80::1]:5678")},
},
AsOf: asof,
}, nil)
// Then: we receive an update with the endpoint
node := testutil.RequireRecvCtx(ctx, t, nodeCh)
require.Equal(t, nodeKey, node.Key)
require.Equal(t, discoKey, node.DiscoKey)
require.Equal(t, []string{"[fe80::1]:5678"}, node.Endpoints)
// Then: we store the AsOf time as lastStatus
uut.L.Lock()
require.Equal(t, uut.lastStatus, asof)
uut.L.Unlock()
done := make(chan struct{})
go func() {
defer close(done)
uut.close()
}()
_ = testutil.RequireRecvCtx(ctx, t, done)
}
func TestNodeUpdater_setStatus_same(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
id := tailcfg.NodeID(1)
nodeKey := key.NewNode().Public()
discoKey := key.NewDisco().Public()
nodeCh := make(chan *Node)
uut := newNodeUpdater(
logger,
func(n *Node) {
nodeCh <- n
},
id, nodeKey, discoKey,
)
defer uut.close()
// Then: we don't configure
requireNeverConfigures(ctx, t, &uut.phased)
// Given: preferred DERP is 1, so we would send an update on change &&
// endpoints set to {"[fe80::1]:5678"}
uut.L.Lock()
uut.preferredDERP = 1
uut.endpoints = []string{"[fe80::1]:5678"}
uut.L.Unlock()
// When: we set a status with endpoints {[fe80::1]:5678}
uut.setStatus(&wgengine.Status{LocalAddrs: []tailcfg.Endpoint{
{Addr: netip.MustParseAddrPort("[fe80::1]:5678")},
}}, nil)
done := make(chan struct{})
go func() {
defer close(done)
uut.close()
}()
_ = testutil.RequireRecvCtx(ctx, t, done)
}
func TestNodeUpdater_setStatus_error(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
id := tailcfg.NodeID(1)
nodeKey := key.NewNode().Public()
discoKey := key.NewDisco().Public()
nodeCh := make(chan *Node)
uut := newNodeUpdater(
logger,
func(n *Node) {
nodeCh <- n
},
id, nodeKey, discoKey,
)
defer uut.close()
// Then: we don't configure
requireNeverConfigures(ctx, t, &uut.phased)
// Given: preferred DERP is 1, so we would send an update on change && empty endpoints
uut.L.Lock()
uut.preferredDERP = 1
uut.L.Unlock()
// When: we set a status with endpoints {[fe80::1]:5678}, with an error
uut.setStatus(&wgengine.Status{LocalAddrs: []tailcfg.Endpoint{
{Addr: netip.MustParseAddrPort("[fe80::1]:5678")},
}}, xerrors.New("test"))
done := make(chan struct{})
go func() {
defer close(done)
uut.close()
}()
_ = testutil.RequireRecvCtx(ctx, t, done)
}
func TestNodeUpdater_setStatus_outdated(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
id := tailcfg.NodeID(1)
nodeKey := key.NewNode().Public()
discoKey := key.NewDisco().Public()
nodeCh := make(chan *Node)
uut := newNodeUpdater(
logger,
func(n *Node) {
nodeCh <- n
},
id, nodeKey, discoKey,
)
defer uut.close()
// Then: we don't configure
requireNeverConfigures(ctx, t, &uut.phased)
// Given: preferred DERP is 1, so we would send an update on change && lastStatus set ahead
ahead := time.Date(2024, 1, 10, 8, 0o0, 1, 0, time.UTC)
behind := time.Date(2024, 1, 10, 8, 0o0, 0, 0, time.UTC)
uut.L.Lock()
uut.preferredDERP = 1
uut.lastStatus = ahead
uut.L.Unlock()
// When: we set a status with endpoints {[fe80::1]:5678}, with AsOf set behind
uut.setStatus(&wgengine.Status{
LocalAddrs: []tailcfg.Endpoint{{Addr: netip.MustParseAddrPort("[fe80::1]:5678")}},
AsOf: behind,
}, xerrors.New("test"))
done := make(chan struct{})
go func() {
defer close(done)
uut.close()
}()
_ = testutil.RequireRecvCtx(ctx, t, done)
}
func TestNodeUpdater_setAddresses_different(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
id := tailcfg.NodeID(1)
nodeKey := key.NewNode().Public()
discoKey := key.NewDisco().Public()
nodeCh := make(chan *Node)
uut := newNodeUpdater(
logger,
func(n *Node) {
nodeCh <- n
},
id, nodeKey, discoKey,
)
defer uut.close()
// Given: preferred DERP is 1, so we'll send an update
uut.L.Lock()
uut.preferredDERP = 1
uut.L.Unlock()
// When: we set addresses
addrs := []netip.Prefix{netip.MustParsePrefix("192.168.0.200/32")}
uut.setAddresses(addrs)
// Then: we receive an update with the addresses
node := testutil.RequireRecvCtx(ctx, t, nodeCh)
require.Equal(t, nodeKey, node.Key)
require.Equal(t, discoKey, node.DiscoKey)
require.Equal(t, addrs, node.Addresses)
require.Equal(t, addrs, node.AllowedIPs)
done := make(chan struct{})
go func() {
defer close(done)
uut.close()
}()
_ = testutil.RequireRecvCtx(ctx, t, done)
}
func TestNodeUpdater_setAddresses_same(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
id := tailcfg.NodeID(1)
nodeKey := key.NewNode().Public()
discoKey := key.NewDisco().Public()
nodeCh := make(chan *Node)
uut := newNodeUpdater(
logger,
func(n *Node) {
nodeCh <- n
},
id, nodeKey, discoKey,
)
defer uut.close()
// Then: we don't configure
requireNeverConfigures(ctx, t, &uut.phased)
// Given: preferred DERP is 1, so we would send an update on change &&
// addrs already set
addrs := []netip.Prefix{netip.MustParsePrefix("192.168.0.200/32")}
uut.L.Lock()
uut.preferredDERP = 1
uut.addresses = slices.Clone(addrs)
uut.L.Unlock()
// When: we set addrs
uut.setAddresses(addrs)
done := make(chan struct{})
go func() {
defer close(done)
uut.close()
}()
_ = testutil.RequireRecvCtx(ctx, t, done)
}
func TestNodeUpdater_setCallback(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
id := tailcfg.NodeID(1)
nodeKey := key.NewNode().Public()
discoKey := key.NewDisco().Public()
uut := newNodeUpdater(
logger,
nil,
id, nodeKey, discoKey,
)
defer uut.close()
// Given: preferred DERP is 1
addrs := []netip.Prefix{netip.MustParsePrefix("192.168.0.200/32")}
uut.L.Lock()
uut.preferredDERP = 1
uut.addresses = slices.Clone(addrs)
uut.L.Unlock()
// When: we set callback
nodeCh := make(chan *Node)
uut.setCallback(func(n *Node) {
nodeCh <- n
})
// Then: we get a node update
node := testutil.RequireRecvCtx(ctx, t, nodeCh)
require.Equal(t, nodeKey, node.Key)
require.Equal(t, discoKey, node.DiscoKey)
require.Equal(t, 1, node.PreferredDERP)
done := make(chan struct{})
go func() {
defer close(done)
uut.close()
}()
_ = testutil.RequireRecvCtx(ctx, t, done)
}
func TestNodeUpdater_setBlockEndpoints_different(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
id := tailcfg.NodeID(1)
nodeKey := key.NewNode().Public()
discoKey := key.NewDisco().Public()
nodeCh := make(chan *Node)
uut := newNodeUpdater(
logger,
func(n *Node) {
nodeCh <- n
},
id, nodeKey, discoKey,
)
defer uut.close()
// Given: preferred DERP is 1, so we'll send an update && some endpoints
uut.L.Lock()
uut.preferredDERP = 1
uut.endpoints = []string{"10.11.12.13:7890"}
uut.L.Unlock()
// When: we setBlockEndpoints
uut.setBlockEndpoints(true)
// Then: we receive an update without endpoints
node := testutil.RequireRecvCtx(ctx, t, nodeCh)
require.Equal(t, nodeKey, node.Key)
require.Equal(t, discoKey, node.DiscoKey)
require.Len(t, node.Endpoints, 0)
// When: we unset BlockEndpoints
uut.setBlockEndpoints(false)
// Then: we receive an update with endpoints
node = testutil.RequireRecvCtx(ctx, t, nodeCh)
require.Equal(t, nodeKey, node.Key)
require.Equal(t, discoKey, node.DiscoKey)
require.Len(t, node.Endpoints, 1)
done := make(chan struct{})
go func() {
defer close(done)
uut.close()
}()
_ = testutil.RequireRecvCtx(ctx, t, done)
}
func TestNodeUpdater_setBlockEndpoints_same(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
id := tailcfg.NodeID(1)
nodeKey := key.NewNode().Public()
discoKey := key.NewDisco().Public()
nodeCh := make(chan *Node)
uut := newNodeUpdater(
logger,
func(n *Node) {
nodeCh <- n
},
id, nodeKey, discoKey,
)
defer uut.close()
// Then: we don't configure
requireNeverConfigures(ctx, t, &uut.phased)
// Given: preferred DERP is 1, so we would send an update on change &&
// blockEndpoints already set
uut.L.Lock()
uut.preferredDERP = 1
uut.blockEndpoints = true
uut.L.Unlock()
// When: we set block endpoints
uut.setBlockEndpoints(true)
done := make(chan struct{})
go func() {
defer close(done)
uut.close()
}()
_ = testutil.RequireRecvCtx(ctx, t, done)
}
func TestNodeUpdater_fillPeerDiagnostics(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
id := tailcfg.NodeID(1)
nodeKey := key.NewNode().Public()
discoKey := key.NewDisco().Public()
uut := newNodeUpdater(
logger,
func(n *Node) {},
id, nodeKey, discoKey,
)
defer uut.close()
// at start of day, filling diagnostics will not have derp and sentNode is false
d := PeerDiagnostics{}
uut.fillPeerDiagnostics(&d)
require.Equal(t, 0, d.PreferredDERP)
require.False(t, d.SentNode)
dl := map[string]float64{"1": 0.025}
uut.setNetInfo(&tailcfg.NetInfo{
PreferredDERP: 1,
DERPLatency: dl,
})
// after node callback, we should get the derp and SentNode is true.
// Use eventually since, there is a race between the callback completing
// and the test checking
require.Eventually(t, func() bool {
d := PeerDiagnostics{}
uut.fillPeerDiagnostics(&d)
// preferred DERP should be set right away, even if the callback is not
// complete.
if !assert.Equal(t, 1, d.PreferredDERP) {
return false
}
return d.SentNode
}, testutil.WaitShort, testutil.IntervalFast)
done := make(chan struct{})
go func() {
defer close(done)
uut.close()
}()
_ = testutil.RequireRecvCtx(ctx, t, done)
}
func TestNodeUpdater_fillPeerDiagnostics_noCallback(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
id := tailcfg.NodeID(1)
nodeKey := key.NewNode().Public()
discoKey := key.NewDisco().Public()
uut := newNodeUpdater(
logger,
nil,
id, nodeKey, discoKey,
)
defer uut.close()
// at start of day, filling diagnostics will not have derp and sentNode is false
d := PeerDiagnostics{}
uut.fillPeerDiagnostics(&d)
require.Equal(t, 0, d.PreferredDERP)
require.False(t, d.SentNode)
dl := map[string]float64{"1": 0.025}
uut.setNetInfo(&tailcfg.NetInfo{
PreferredDERP: 1,
DERPLatency: dl,
})
// since there is no callback, SentNode should not be true, but we get
// the preferred DERP
uut.fillPeerDiagnostics(&d)
require.Equal(t, 1, d.PreferredDERP)
require.False(t, d.SentNode)
done := make(chan struct{})
go func() {
defer close(done)
uut.close()
}()
_ = testutil.RequireRecvCtx(ctx, t, done)
}