diff --git a/go.mod b/go.mod index abb15bd5f2..1fb18fc4b0 100644 --- a/go.mod +++ b/go.mod @@ -206,6 +206,8 @@ require ( require go.uber.org/mock v0.4.0 +require github.com/benbjohnson/clock v1.3.5 // indirect + require ( cloud.google.com/go/compute v1.23.3 // indirect cloud.google.com/go/logging v1.8.1 // indirect diff --git a/go.sum b/go.sum index 64ecdd7299..2deb6039a7 100644 --- a/go.sum +++ b/go.sum @@ -123,6 +123,8 @@ github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiE github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4= +github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= +github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bep/godartsass v1.2.0 h1:E2VvQrxAHAFwbjyOIExAMmogTItSKodoKuijNrGm5yU= diff --git a/tailnet/configmaps.go b/tailnet/configmaps.go index 20221ef587..2ca1dd17b2 100644 --- a/tailnet/configmaps.go +++ b/tailnet/configmaps.go @@ -3,11 +3,15 @@ package tailnet import ( "context" "errors" + "fmt" "net/netip" "sync" + "time" + "github.com/benbjohnson/clock" "github.com/google/uuid" "go4.org/netipx" + "tailscale.com/ipn/ipnstate" "tailscale.com/net/dns" "tailscale.com/tailcfg" "tailscale.com/types/ipproto" @@ -23,10 +27,13 @@ import ( "github.com/coder/coder/v2/tailnet/proto" ) +const lostTimeout = 15 * time.Minute + // engineConfigurable is the subset of wgengine.Engine that we use for configuration. // // This allows us to test configuration code without faking the whole interface. type engineConfigurable interface { + UpdateStatus(*ipnstate.StatusBuilder) SetNetworkMap(*netmap.NetworkMap) Reconfig(*wgcfg.Config, *router.Config, *dns.Config, *tailcfg.Debug) error SetDERPMap(*tailcfg.DERPMap) @@ -49,12 +56,16 @@ type configMaps struct { closing bool phase phase - engine engineConfigurable - static netmap.NetworkMap - peers map[uuid.UUID]*peerLifecycle - addresses []netip.Prefix - derpMap *proto.DERPMap - logger slog.Logger + engine engineConfigurable + static netmap.NetworkMap + peers map[uuid.UUID]*peerLifecycle + addresses []netip.Prefix + derpMap *proto.DERPMap + logger slog.Logger + blockEndpoints bool + + // for testing + clock clock.Clock } func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg.NodeID, nodeKey key.NodePrivate, discoKey key.DiscoPublic, addresses []netip.Prefix) *configMaps { @@ -101,6 +112,7 @@ func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg }, peers: make(map[uuid.UUID]*peerLifecycle), addresses: addresses, + clock: clock.New(), } go c.configLoop() return c @@ -165,6 +177,9 @@ func (c *configMaps) configLoop() { func (c *configMaps) close() { c.L.Lock() defer c.L.Unlock() + for _, lc := range c.peers { + lc.resetTimer() + } c.closing = true c.Broadcast() for c.phase != closed { @@ -260,11 +275,208 @@ func (c *configMaps) filterLocked() *filter.Filter { ) } +// updatePeers handles protocol updates about peers from the coordinator. c.L MUST NOT be held. +func (c *configMaps) updatePeers(updates []*proto.CoordinateResponse_PeerUpdate) { + status := c.status() + c.L.Lock() + defer c.L.Unlock() + + // Update all the lastHandshake values here. That way we don't have to + // worry about them being up-to-date when handling updates below, and it covers + // all peers, not just the ones we got updates about. + for _, lc := range c.peers { + if peerStatus, ok := status.Peer[lc.node.Key]; ok { + lc.lastHandshake = peerStatus.LastHandshake + } + } + + for _, update := range updates { + if dirty := c.updatePeerLocked(update, status); dirty { + c.netmapDirty = true + } + } + if c.netmapDirty { + c.Broadcast() + } +} + +// status requests a status update from the engine. +func (c *configMaps) status() *ipnstate.Status { + sb := &ipnstate.StatusBuilder{WantPeers: true} + c.engine.UpdateStatus(sb) + return sb.Status() +} + +// updatePeerLocked processes a single update for a single peer. It is intended +// as internal function since it returns whether or not the config is dirtied by +// the update (instead of handling it directly like updatePeers). c.L must be held. +func (c *configMaps) updatePeerLocked(update *proto.CoordinateResponse_PeerUpdate, status *ipnstate.Status) (dirty bool) { + id, err := uuid.FromBytes(update.Id) + if err != nil { + c.logger.Critical(context.Background(), "received update with bad id", slog.F("id", update.Id)) + return false + } + logger := c.logger.With(slog.F("peer_id", id)) + lc, ok := c.peers[id] + var node *tailcfg.Node + if update.Kind == proto.CoordinateResponse_PeerUpdate_NODE { + // If no preferred DERP is provided, we can't reach the node. + if update.Node.PreferredDerp == 0 { + logger.Warn(context.Background(), "no preferred DERP, peer update", slog.F("node_proto", update.Node)) + return false + } + node, err = c.protoNodeToTailcfg(update.Node) + if err != nil { + logger.Critical(context.Background(), "failed to convert proto node to tailcfg", slog.F("node_proto", update.Node)) + return false + } + logger = logger.With(slog.F("key_id", node.Key.ShortString()), slog.F("node", node)) + peerStatus, ok := status.Peer[node.Key] + // Starting KeepAlive messages at the initialization of a connection + // causes a race condition. If we send the handshake before the peer has + // our node, we'll have to wait for 5 seconds before trying again. + // Ideally, the first handshake starts when the user first initiates a + // connection to the peer. After a successful connection we enable + // keep alives to persist the connection and keep it from becoming idle. + // SSH connections don't send packets while idle, so we use keep alives + // to avoid random hangs while we set up the connection again after + // inactivity. + node.KeepAlive = ok && peerStatus.Active + if c.blockEndpoints { + node.Endpoints = nil + } + } + switch { + case !ok && update.Kind == proto.CoordinateResponse_PeerUpdate_NODE: + // new! + var lastHandshake time.Time + if ps, ok := status.Peer[node.Key]; ok { + lastHandshake = ps.LastHandshake + } + c.peers[id] = &peerLifecycle{ + peerID: id, + node: node, + lastHandshake: lastHandshake, + lost: false, + } + logger.Debug(context.Background(), "adding new peer") + return true + case ok && update.Kind == proto.CoordinateResponse_PeerUpdate_NODE: + // update + node.Created = lc.node.Created + dirty = !lc.node.Equal(node) + lc.node = node + lc.lost = false + lc.resetTimer() + logger.Debug(context.Background(), "node update to existing peer", slog.F("dirty", dirty)) + return dirty + case !ok: + // disconnected or lost, but we don't have the node. No op + logger.Debug(context.Background(), "skipping update for peer we don't recognize") + return false + case update.Kind == proto.CoordinateResponse_PeerUpdate_DISCONNECTED: + lc.resetTimer() + delete(c.peers, id) + logger.Debug(context.Background(), "disconnected peer") + return true + case update.Kind == proto.CoordinateResponse_PeerUpdate_LOST: + lc.lost = true + lc.setLostTimer(c) + logger.Debug(context.Background(), "marked peer lost") + // marking a node lost doesn't change anything right now, so dirty=false + return false + default: + logger.Warn(context.Background(), "unknown peer update", slog.F("kind", update.Kind)) + return false + } +} + +// peerLostTimeout is the callback that peerLifecycle uses when a peer is lost the timeout to +// receive a handshake fires. +func (c *configMaps) peerLostTimeout(id uuid.UUID) { + logger := c.logger.With(slog.F("peer_id", id)) + logger.Debug(context.Background(), + "peer lost timeout") + + // First do a status update to see if the peer did a handshake while we were + // waiting + status := c.status() + c.L.Lock() + defer c.L.Unlock() + + lc, ok := c.peers[id] + if !ok { + logger.Debug(context.Background(), + "timeout triggered for peer that is removed from the map") + return + } + if peerStatus, ok := status.Peer[lc.node.Key]; ok { + lc.lastHandshake = peerStatus.LastHandshake + } + logger = logger.With(slog.F("key_id", lc.node.Key.ShortString())) + if !lc.lost { + logger.Debug(context.Background(), + "timeout triggered for peer that is no longer lost") + return + } + since := c.clock.Since(lc.lastHandshake) + if since >= lostTimeout { + logger.Info( + context.Background(), "removing lost peer") + delete(c.peers, id) + c.netmapDirty = true + c.Broadcast() + return + } + logger.Debug(context.Background(), + "timeout triggered for peer but it had handshake in meantime") + lc.setLostTimer(c) +} + +func (c *configMaps) protoNodeToTailcfg(p *proto.Node) (*tailcfg.Node, error) { + node, err := ProtoToNode(p) + if err != nil { + return nil, err + } + return &tailcfg.Node{ + ID: tailcfg.NodeID(p.GetId()), + Created: c.clock.Now(), + Key: node.Key, + DiscoKey: node.DiscoKey, + Addresses: node.Addresses, + AllowedIPs: node.AllowedIPs, + Endpoints: node.Endpoints, + DERP: fmt.Sprintf("%s:%d", tailcfg.DerpMagicIP, node.PreferredDERP), + Hostinfo: (&tailcfg.Hostinfo{}).View(), + }, nil +} + type peerLifecycle struct { - node *tailcfg.Node - // TODO: implement timers to track lost peers - // lastHandshake time.Time - // timer time.Timer + peerID uuid.UUID + node *tailcfg.Node + lost bool + lastHandshake time.Time + timer *clock.Timer +} + +func (l *peerLifecycle) resetTimer() { + if l.timer != nil { + l.timer.Stop() + l.timer = nil + } +} + +func (l *peerLifecycle) setLostTimer(c *configMaps) { + if l.timer != nil { + l.timer.Stop() + } + ttl := lostTimeout - c.clock.Since(l.lastHandshake) + if ttl <= 0 { + ttl = time.Nanosecond + } + l.timer = c.clock.AfterFunc(ttl, func() { + c.peerLostTimeout(l.peerID) + }) } // prefixesDifferent returns true if the two slices contain different prefixes diff --git a/tailnet/configmaps_internal_test.go b/tailnet/configmaps_internal_test.go index 0aaad2e15a..3b2c27fad8 100644 --- a/tailnet/configmaps_internal_test.go +++ b/tailnet/configmaps_internal_test.go @@ -1,12 +1,17 @@ package tailnet import ( + "context" "net/netip" "sync" "testing" + "time" + "github.com/benbjohnson/clock" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "tailscale.com/ipn/ipnstate" "tailscale.com/net/dns" "tailscale.com/tailcfg" "tailscale.com/types/key" @@ -15,14 +20,16 @@ import ( "tailscale.com/wgengine/router" "tailscale.com/wgengine/wgcfg" + "cdr.dev/slog" "cdr.dev/slog/sloggers/slogtest" + "github.com/coder/coder/v2/tailnet/proto" "github.com/coder/coder/v2/testutil" ) func TestConfigMaps_setAddresses_different(t *testing.T) { t.Parallel() ctx := testutil.Context(t, testutil.WaitShort) - logger := slogtest.Make(t, nil) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) fEng := newFakeEngineConfigurable() nodePrivateKey := key.NewNode() nodeID := tailcfg.NodeID(5) @@ -80,7 +87,7 @@ func TestConfigMaps_setAddresses_different(t *testing.T) { func TestConfigMaps_setAddresses_same(t *testing.T) { t.Parallel() ctx := testutil.Context(t, testutil.WaitShort) - logger := slogtest.Make(t, nil) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) fEng := newFakeEngineConfigurable() nodePrivateKey := key.NewNode() nodeID := tailcfg.NodeID(5) @@ -89,6 +96,494 @@ func TestConfigMaps_setAddresses_same(t *testing.T) { uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public(), addrs) defer uut.close() + requireNeverConfigures(ctx, t, uut) + + uut.setAddresses(addrs) + + done := make(chan struct{}) + go func() { + defer close(done) + uut.close() + }() + _ = testutil.RequireRecvCtx(ctx, t, done) +} + +func TestConfigMaps_updatePeers_new(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + fEng := newFakeEngineConfigurable() + nodePrivateKey := key.NewNode() + nodeID := tailcfg.NodeID(5) + discoKey := key.NewDisco() + uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public(), nil) + defer uut.close() + + p1ID := uuid.UUID{1} + p1Node := newTestNode(1) + p1n, err := NodeToProto(p1Node) + require.NoError(t, err) + p2ID := uuid.UUID{2} + p2Node := newTestNode(2) + p2n, err := NodeToProto(p2Node) + require.NoError(t, err) + + go func() { + b := <-fEng.status + b.AddPeer(p1Node.Key, &ipnstate.PeerStatus{ + PublicKey: p1Node.Key, + LastHandshake: time.Date(2024, 1, 7, 12, 13, 10, 0, time.UTC), + Active: true, + }) + // peer 2 is missing, so it won't have KeepAlives set + fEng.statusDone <- struct{}{} + }() + + updates := []*proto.CoordinateResponse_PeerUpdate{ + { + Id: p1ID[:], + Kind: proto.CoordinateResponse_PeerUpdate_NODE, + Node: p1n, + }, + { + Id: p2ID[:], + Kind: proto.CoordinateResponse_PeerUpdate_NODE, + Node: p2n, + }, + } + uut.updatePeers(updates) + + nm := testutil.RequireRecvCtx(ctx, t, fEng.setNetworkMap) + r := testutil.RequireRecvCtx(ctx, t, fEng.reconfig) + + require.Len(t, nm.Peers, 2) + n1 := getNodeWithID(t, nm.Peers, 1) + require.Equal(t, "127.3.3.40:1", n1.DERP) + require.Equal(t, p1Node.Endpoints, n1.Endpoints) + require.True(t, n1.KeepAlive) + n2 := getNodeWithID(t, nm.Peers, 2) + require.Equal(t, "127.3.3.40:2", n2.DERP) + require.Equal(t, p2Node.Endpoints, n2.Endpoints) + require.False(t, n2.KeepAlive) + + // we rely on nmcfg.WGCfg() to convert the netmap to wireguard config, so just + // require the right number of peers. + require.Len(t, r.wg.Peers, 2) + + done := make(chan struct{}) + go func() { + defer close(done) + uut.close() + }() + _ = testutil.RequireRecvCtx(ctx, t, done) +} + +func TestConfigMaps_updatePeers_same(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + fEng := newFakeEngineConfigurable() + nodePrivateKey := key.NewNode() + nodeID := tailcfg.NodeID(5) + discoKey := key.NewDisco() + uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public(), nil) + defer uut.close() + + // Then: we don't configure + requireNeverConfigures(ctx, t, uut) + + p1ID := uuid.UUID{1} + p1Node := newTestNode(1) + p1n, err := NodeToProto(p1Node) + require.NoError(t, err) + p1tcn, err := uut.protoNodeToTailcfg(p1n) + p1tcn.KeepAlive = true + require.NoError(t, err) + + // Given: peer already exists + uut.L.Lock() + uut.peers[p1ID] = &peerLifecycle{ + peerID: p1ID, + node: p1tcn, + lastHandshake: time.Date(2024, 1, 7, 12, 0, 10, 0, time.UTC), + } + uut.L.Unlock() + + go func() { + b := <-fEng.status + b.AddPeer(p1Node.Key, &ipnstate.PeerStatus{ + PublicKey: p1Node.Key, + LastHandshake: time.Date(2024, 1, 7, 12, 13, 10, 0, time.UTC), + Active: true, + }) + fEng.statusDone <- struct{}{} + }() + + // When: update with no changes + updates := []*proto.CoordinateResponse_PeerUpdate{ + { + Id: p1ID[:], + Kind: proto.CoordinateResponse_PeerUpdate_NODE, + Node: p1n, + }, + } + uut.updatePeers(updates) + + done := make(chan struct{}) + go func() { + defer close(done) + uut.close() + }() + _ = testutil.RequireRecvCtx(ctx, t, done) +} + +func TestConfigMaps_updatePeers_disconnect(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + fEng := newFakeEngineConfigurable() + nodePrivateKey := key.NewNode() + nodeID := tailcfg.NodeID(5) + discoKey := key.NewDisco() + uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public(), nil) + defer uut.close() + + p1ID := uuid.UUID{1} + p1Node := newTestNode(1) + p1n, err := NodeToProto(p1Node) + require.NoError(t, err) + p1tcn, err := uut.protoNodeToTailcfg(p1n) + p1tcn.KeepAlive = true + require.NoError(t, err) + + // set a timer, which should get canceled by the disconnect. + timer := uut.clock.AfterFunc(testutil.WaitMedium, func() { + t.Error("this should not be called!") + }) + + // Given: peer already exists + uut.L.Lock() + uut.peers[p1ID] = &peerLifecycle{ + peerID: p1ID, + node: p1tcn, + lastHandshake: time.Date(2024, 1, 7, 12, 0, 10, 0, time.UTC), + timer: timer, + } + uut.L.Unlock() + + go func() { + b := <-fEng.status + b.AddPeer(p1Node.Key, &ipnstate.PeerStatus{ + PublicKey: p1Node.Key, + LastHandshake: time.Date(2024, 1, 7, 12, 13, 10, 0, time.UTC), + Active: true, + }) + fEng.statusDone <- struct{}{} + }() + + // When: update DISCONNECTED + updates := []*proto.CoordinateResponse_PeerUpdate{ + { + Id: p1ID[:], + Kind: proto.CoordinateResponse_PeerUpdate_DISCONNECTED, + }, + } + uut.updatePeers(updates) + assert.False(t, timer.Stop(), "timer was not stopped") + + // Then, configure engine without the peer. + nm := testutil.RequireRecvCtx(ctx, t, fEng.setNetworkMap) + r := testutil.RequireRecvCtx(ctx, t, fEng.reconfig) + require.Len(t, nm.Peers, 0) + require.Len(t, r.wg.Peers, 0) + + done := make(chan struct{}) + go func() { + defer close(done) + uut.close() + }() + _ = testutil.RequireRecvCtx(ctx, t, done) +} + +func TestConfigMaps_updatePeers_lost(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + fEng := newFakeEngineConfigurable() + nodePrivateKey := key.NewNode() + nodeID := tailcfg.NodeID(5) + discoKey := key.NewDisco() + uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public(), nil) + defer uut.close() + start := time.Date(2024, time.January, 1, 8, 0, 0, 0, time.UTC) + mClock := clock.NewMock() + mClock.Set(start) + uut.clock = mClock + + p1ID := uuid.UUID{1} + p1Node := newTestNode(1) + p1n, err := NodeToProto(p1Node) + require.NoError(t, err) + + s1 := expectStatusWithHandshake(ctx, t, fEng, p1Node.Key, start) + + updates := []*proto.CoordinateResponse_PeerUpdate{ + { + Id: p1ID[:], + Kind: proto.CoordinateResponse_PeerUpdate_NODE, + Node: p1n, + }, + } + uut.updatePeers(updates) + nm := testutil.RequireRecvCtx(ctx, t, fEng.setNetworkMap) + r := testutil.RequireRecvCtx(ctx, t, fEng.reconfig) + require.Len(t, nm.Peers, 1) + require.Len(t, r.wg.Peers, 1) + _ = testutil.RequireRecvCtx(ctx, t, s1) + + mClock.Add(5 * time.Second) + + s2 := expectStatusWithHandshake(ctx, t, fEng, p1Node.Key, start) + + updates[0].Kind = proto.CoordinateResponse_PeerUpdate_LOST + updates[0].Node = nil + uut.updatePeers(updates) + _ = testutil.RequireRecvCtx(ctx, t, s2) + + // No reprogramming yet, since we keep the peer around. + select { + case <-fEng.setNetworkMap: + t.Fatal("should not reprogram") + default: + // OK! + } + + // When we advance the clock, the timeout triggers. However, the new + // latest handshake has advanced by a minute, so we don't remove the peer. + lh := start.Add(time.Minute) + s3 := expectStatusWithHandshake(ctx, t, fEng, p1Node.Key, lh) + mClock.Add(lostTimeout) + _ = testutil.RequireRecvCtx(ctx, t, s3) + select { + case <-fEng.setNetworkMap: + t.Fatal("should not reprogram") + default: + // OK! + } + + // Before we update the clock again, we need to be sure the timeout has + // completed running. To do that, we check the new lastHandshake has been set + require.Eventually(t, func() bool { + uut.L.Lock() + defer uut.L.Unlock() + return uut.peers[p1ID].lastHandshake == lh + }, testutil.WaitShort, testutil.IntervalFast) + + // Advance the clock again by a minute, which should trigger the reprogrammed + // timeout. + s4 := expectStatusWithHandshake(ctx, t, fEng, p1Node.Key, lh) + mClock.Add(time.Minute) + + nm = testutil.RequireRecvCtx(ctx, t, fEng.setNetworkMap) + r = testutil.RequireRecvCtx(ctx, t, fEng.reconfig) + require.Len(t, nm.Peers, 0) + require.Len(t, r.wg.Peers, 0) + _ = testutil.RequireRecvCtx(ctx, t, s4) + + done := make(chan struct{}) + go func() { + defer close(done) + uut.close() + }() + _ = testutil.RequireRecvCtx(ctx, t, done) +} + +func TestConfigMaps_updatePeers_lost_and_found(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + fEng := newFakeEngineConfigurable() + nodePrivateKey := key.NewNode() + nodeID := tailcfg.NodeID(5) + discoKey := key.NewDisco() + uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public(), nil) + defer uut.close() + start := time.Date(2024, time.January, 1, 8, 0, 0, 0, time.UTC) + mClock := clock.NewMock() + mClock.Set(start) + uut.clock = mClock + + p1ID := uuid.UUID{1} + p1Node := newTestNode(1) + p1n, err := NodeToProto(p1Node) + require.NoError(t, err) + + s1 := expectStatusWithHandshake(ctx, t, fEng, p1Node.Key, start) + + updates := []*proto.CoordinateResponse_PeerUpdate{ + { + Id: p1ID[:], + Kind: proto.CoordinateResponse_PeerUpdate_NODE, + Node: p1n, + }, + } + uut.updatePeers(updates) + nm := testutil.RequireRecvCtx(ctx, t, fEng.setNetworkMap) + r := testutil.RequireRecvCtx(ctx, t, fEng.reconfig) + require.Len(t, nm.Peers, 1) + require.Len(t, r.wg.Peers, 1) + _ = testutil.RequireRecvCtx(ctx, t, s1) + + mClock.Add(5 * time.Second) + + s2 := expectStatusWithHandshake(ctx, t, fEng, p1Node.Key, start) + + updates[0].Kind = proto.CoordinateResponse_PeerUpdate_LOST + updates[0].Node = nil + uut.updatePeers(updates) + _ = testutil.RequireRecvCtx(ctx, t, s2) + + // No reprogramming yet, since we keep the peer around. + select { + case <-fEng.setNetworkMap: + t.Fatal("should not reprogram") + default: + // OK! + } + + mClock.Add(5 * time.Second) + s3 := expectStatusWithHandshake(ctx, t, fEng, p1Node.Key, start) + + updates[0].Kind = proto.CoordinateResponse_PeerUpdate_NODE + updates[0].Node = p1n + uut.updatePeers(updates) + _ = testutil.RequireRecvCtx(ctx, t, s3) + // This does not trigger reprogramming, because we never removed the node + select { + case <-fEng.setNetworkMap: + t.Fatal("should not reprogram") + default: + // OK! + } + + // When we advance the clock, nothing happens because the timeout was + // canceled + mClock.Add(lostTimeout) + select { + case <-fEng.setNetworkMap: + t.Fatal("should not reprogram") + default: + // OK! + } + + done := make(chan struct{}) + go func() { + defer close(done) + uut.close() + }() + _ = testutil.RequireRecvCtx(ctx, t, done) +} + +func expectStatusWithHandshake( + ctx context.Context, t testing.TB, fEng *fakeEngineConfigurable, k key.NodePublic, lastHandshake time.Time, +) <-chan struct{} { + t.Helper() + called := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + t.Error("timeout waiting for status") + return + case b := <-fEng.status: + b.AddPeer(k, &ipnstate.PeerStatus{ + PublicKey: k, + LastHandshake: lastHandshake, + Active: true, + }) + select { + case <-ctx.Done(): + t.Error("timeout sending done") + case fEng.statusDone <- struct{}{}: + close(called) + return + } + } + }() + return called +} + +func TestConfigMaps_updatePeers_nonexist(t *testing.T) { + t.Parallel() + + for _, k := range []proto.CoordinateResponse_PeerUpdate_Kind{ + proto.CoordinateResponse_PeerUpdate_DISCONNECTED, + proto.CoordinateResponse_PeerUpdate_LOST, + } { + k := k + t.Run(k.String(), func(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + fEng := newFakeEngineConfigurable() + nodePrivateKey := key.NewNode() + nodeID := tailcfg.NodeID(5) + discoKey := key.NewDisco() + uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public(), nil) + defer uut.close() + + // Then: we don't configure + requireNeverConfigures(ctx, t, uut) + + // Given: no known peers + go func() { + <-fEng.status + fEng.statusDone <- struct{}{} + }() + + // When: update with LOST/DISCONNECTED + p1ID := uuid.UUID{1} + updates := []*proto.CoordinateResponse_PeerUpdate{ + { + Id: p1ID[:], + Kind: k, + }, + } + uut.updatePeers(updates) + + done := make(chan struct{}) + go func() { + defer close(done) + uut.close() + }() + _ = testutil.RequireRecvCtx(ctx, t, done) + }) + } +} + +func newTestNode(id int) *Node { + return &Node{ + ID: tailcfg.NodeID(id), + AsOf: time.Date(2024, 1, 7, 12, 13, 14, 15, time.UTC), + Key: key.NewNode().Public(), + DiscoKey: key.NewDisco().Public(), + Endpoints: []string{"192.168.0.55"}, + PreferredDERP: id, + } +} + +func getNodeWithID(t testing.TB, peers []*tailcfg.Node, id tailcfg.NodeID) *tailcfg.Node { + t.Helper() + for _, n := range peers { + if n.ID == id { + return n + } + } + t.Fatal() + return nil +} + +func requireNeverConfigures(ctx context.Context, t *testing.T, uut *configMaps) { + t.Helper() waiting := make(chan struct{}) go func() { // ensure that we never configure, and go straight to closed @@ -101,15 +596,6 @@ func TestConfigMaps_setAddresses_same(t *testing.T) { assert.Equal(t, closed, uut.phase) }() _ = testutil.RequireRecvCtx(ctx, t, waiting) - - uut.setAddresses(addrs) - - done := make(chan struct{}) - go func() { - defer close(done) - uut.close() - }() - _ = testutil.RequireRecvCtx(ctx, t, done) } type reconfigCall struct { @@ -123,6 +609,16 @@ type fakeEngineConfigurable struct { setNetworkMap chan *netmap.NetworkMap reconfig chan reconfigCall filter chan *filter.Filter + + // To fake these fields the test should read from status, do stuff to the + // StatusBuilder, then write to statusDone + status chan *ipnstate.StatusBuilder + statusDone chan struct{} +} + +func (f fakeEngineConfigurable) UpdateStatus(status *ipnstate.StatusBuilder) { + f.status <- status + <-f.statusDone } func newFakeEngineConfigurable() *fakeEngineConfigurable { @@ -130,6 +626,8 @@ func newFakeEngineConfigurable() *fakeEngineConfigurable { setNetworkMap: make(chan *netmap.NetworkMap), reconfig: make(chan reconfigCall), filter: make(chan *filter.Filter), + status: make(chan *ipnstate.StatusBuilder), + statusDone: make(chan struct{}), } }