From e801e878ba8d27c90da0a17635928a4537db07f8 Mon Sep 17 00:00:00 2001 From: Colin Adler Date: Wed, 10 Apr 2024 17:15:33 -0500 Subject: [PATCH] feat: add agent acks to in-memory coordinator (#12786) When an agent receives a node, it responds with an ACK which is relayed to the client. After the client receives the ACK, it's allowed to begin pinging. --- codersdk/workspacesdk/connector.go | 4 +- ...nal_test.go => connector_internal_test.go} | 2 + enterprise/coderd/workspaceproxy.go | 2 +- tailnet/configmaps.go | 202 +++++++++++--- tailnet/configmaps_internal_test.go | 255 +++++++++++++++++- tailnet/conn.go | 5 +- tailnet/coordinator.go | 88 ++++++ tailnet/coordinator_test.go | 143 ++++++++++ tailnet/proto/tailnet.pb.go | 252 +++++++++++------ tailnet/proto/tailnet.proto | 11 + tailnet/tailnettest/coordinateemock.go | 13 + tailnet/tunnel.go | 10 + tailnet/tunnel_internal_test.go | 15 ++ 13 files changed, 879 insertions(+), 123 deletions(-) rename codersdk/workspacesdk/{workspacesdk_internal_test.go => connector_internal_test.go} (98%) diff --git a/codersdk/workspacesdk/connector.go b/codersdk/workspacesdk/connector.go index 5c1d9e600a..7955e8fb33 100644 --- a/codersdk/workspacesdk/connector.go +++ b/codersdk/workspacesdk/connector.go @@ -86,9 +86,11 @@ func runTailnetAPIConnector( func (tac *tailnetAPIConnector) manageGracefulTimeout() { defer tac.cancelGracefulCtx() <-tac.ctx.Done() + timer := time.NewTimer(time.Second) + defer timer.Stop() select { case <-tac.closed: - case <-time.After(time.Second): + case <-timer.C: } } diff --git a/codersdk/workspacesdk/workspacesdk_internal_test.go b/codersdk/workspacesdk/connector_internal_test.go similarity index 98% rename from codersdk/workspacesdk/workspacesdk_internal_test.go rename to codersdk/workspacesdk/connector_internal_test.go index 57e6f751ff..9f70891fda 100644 --- a/codersdk/workspacesdk/workspacesdk_internal_test.go +++ b/codersdk/workspacesdk/connector_internal_test.go @@ -102,6 +102,8 @@ func (*fakeTailnetConn) SetNodeCallback(func(*tailnet.Node)) {} func (*fakeTailnetConn) SetDERPMap(*tailcfg.DERPMap) {} +func (*fakeTailnetConn) SetTunnelDestination(uuid.UUID) {} + func newFakeTailnetConn() *fakeTailnetConn { return &fakeTailnetConn{} } diff --git a/enterprise/coderd/workspaceproxy.go b/enterprise/coderd/workspaceproxy.go index 379d01ad43..234212f479 100644 --- a/enterprise/coderd/workspaceproxy.go +++ b/enterprise/coderd/workspaceproxy.go @@ -658,7 +658,7 @@ func (api *API) workspaceProxyRegister(rw http.ResponseWriter, r *http.Request) if err != nil { return xerrors.Errorf("insert replica: %w", err) } - } else if err != nil { + } else { return xerrors.Errorf("get replica: %w", err) } diff --git a/tailnet/configmaps.go b/tailnet/configmaps.go index 57a2d9f2d1..8b3aee1585 100644 --- a/tailnet/configmaps.go +++ b/tailnet/configmaps.go @@ -186,7 +186,7 @@ func (c *configMaps) close() { c.L.Lock() defer c.L.Unlock() for _, lc := range c.peers { - lc.resetTimer() + lc.resetLostTimer() } c.closing = true c.Broadcast() @@ -216,6 +216,12 @@ func (c *configMaps) netMapLocked() *netmap.NetworkMap { func (c *configMaps) peerConfigLocked() []*tailcfg.Node { out := make([]*tailcfg.Node, 0, len(c.peers)) for _, p := range c.peers { + // Don't add nodes that we havent received a READY_FOR_HANDSHAKE for + // yet, if they're a destination. If we received a READY_FOR_HANDSHAKE + // for a peer before we receive their node, the node will be nil. + if (!p.readyForHandshake && p.isDestination) || p.node == nil { + continue + } n := p.node.Clone() if c.blockEndpoints { n.Endpoints = nil @@ -225,6 +231,19 @@ func (c *configMaps) peerConfigLocked() []*tailcfg.Node { return out } +func (c *configMaps) setTunnelDestination(id uuid.UUID) { + c.L.Lock() + defer c.L.Unlock() + lc, ok := c.peers[id] + if !ok { + lc = &peerLifecycle{ + peerID: id, + } + c.peers[id] = lc + } + lc.isDestination = true +} + // setAddresses sets the addresses belonging to this node to the given slice. It // triggers configuration of the engine if the addresses have changed. // c.L MUST NOT be held. @@ -331,8 +350,10 @@ func (c *configMaps) updatePeers(updates []*proto.CoordinateResponse_PeerUpdate) // worry about them being up-to-date when handling updates below, and it covers // all peers, not just the ones we got updates about. for _, lc := range c.peers { - if peerStatus, ok := status.Peer[lc.node.Key]; ok { - lc.lastHandshake = peerStatus.LastHandshake + if lc.node != nil { + if peerStatus, ok := status.Peer[lc.node.Key]; ok { + lc.lastHandshake = peerStatus.LastHandshake + } } } @@ -363,7 +384,7 @@ func (c *configMaps) updatePeerLocked(update *proto.CoordinateResponse_PeerUpdat return false } logger := c.logger.With(slog.F("peer_id", id)) - lc, ok := c.peers[id] + lc, peerOk := c.peers[id] var node *tailcfg.Node if update.Kind == proto.CoordinateResponse_PeerUpdate_NODE { // If no preferred DERP is provided, we can't reach the node. @@ -377,48 +398,76 @@ func (c *configMaps) updatePeerLocked(update *proto.CoordinateResponse_PeerUpdat return false } logger = logger.With(slog.F("key_id", node.Key.ShortString()), slog.F("node", node)) - peerStatus, ok := status.Peer[node.Key] - // Starting KeepAlive messages at the initialization of a connection - // causes a race condition. If we send the handshake before the peer has - // our node, we'll have to wait for 5 seconds before trying again. - // Ideally, the first handshake starts when the user first initiates a - // connection to the peer. After a successful connection we enable - // keep alives to persist the connection and keep it from becoming idle. - // SSH connections don't send packets while idle, so we use keep alives - // to avoid random hangs while we set up the connection again after - // inactivity. - node.KeepAlive = ok && peerStatus.Active + node.KeepAlive = c.nodeKeepalive(lc, status, node) } switch { - case !ok && update.Kind == proto.CoordinateResponse_PeerUpdate_NODE: + case !peerOk && update.Kind == proto.CoordinateResponse_PeerUpdate_NODE: // new! var lastHandshake time.Time if ps, ok := status.Peer[node.Key]; ok { lastHandshake = ps.LastHandshake } - c.peers[id] = &peerLifecycle{ + lc = &peerLifecycle{ peerID: id, node: node, lastHandshake: lastHandshake, lost: false, } + c.peers[id] = lc logger.Debug(context.Background(), "adding new peer") - return true - case ok && update.Kind == proto.CoordinateResponse_PeerUpdate_NODE: + return lc.validForWireguard() + case peerOk && update.Kind == proto.CoordinateResponse_PeerUpdate_NODE: // update - node.Created = lc.node.Created + if lc.node != nil { + node.Created = lc.node.Created + } dirty = !lc.node.Equal(node) lc.node = node + // validForWireguard checks that the node is non-nil, so should be + // called after we update the node. + dirty = dirty && lc.validForWireguard() lc.lost = false - lc.resetTimer() + lc.resetLostTimer() + if lc.isDestination && !lc.readyForHandshake { + // We received the node of a destination peer before we've received + // their READY_FOR_HANDSHAKE. Set a timer + lc.setReadyForHandshakeTimer(c) + logger.Debug(context.Background(), "setting ready for handshake timeout") + } logger.Debug(context.Background(), "node update to existing peer", slog.F("dirty", dirty)) return dirty - case !ok: + case peerOk && update.Kind == proto.CoordinateResponse_PeerUpdate_READY_FOR_HANDSHAKE: + dirty := !lc.readyForHandshake + lc.readyForHandshake = true + if lc.readyForHandshakeTimer != nil { + lc.readyForHandshakeTimer.Stop() + } + if lc.node != nil { + old := lc.node.KeepAlive + lc.node.KeepAlive = c.nodeKeepalive(lc, status, lc.node) + dirty = dirty || (old != lc.node.KeepAlive) + } + logger.Debug(context.Background(), "peer ready for handshake") + // only force a reconfig if the node populated + return dirty && lc.node != nil + case !peerOk && update.Kind == proto.CoordinateResponse_PeerUpdate_READY_FOR_HANDSHAKE: + // When we receive a READY_FOR_HANDSHAKE for a peer we don't know about, + // we create a peerLifecycle with the peerID and set readyForHandshake + // to true. Eventually we should receive a NODE update for this peer, + // and it'll be programmed into wireguard. + logger.Debug(context.Background(), "got peer ready for handshake for unknown peer") + lc = &peerLifecycle{ + peerID: id, + readyForHandshake: true, + } + c.peers[id] = lc + return false + case !peerOk: // disconnected or lost, but we don't have the node. No op logger.Debug(context.Background(), "skipping update for peer we don't recognize") return false case update.Kind == proto.CoordinateResponse_PeerUpdate_DISCONNECTED: - lc.resetTimer() + lc.resetLostTimer() delete(c.peers, id) logger.Debug(context.Background(), "disconnected peer") return true @@ -476,10 +525,12 @@ func (c *configMaps) peerLostTimeout(id uuid.UUID) { "timeout triggered for peer that is removed from the map") return } - if peerStatus, ok := status.Peer[lc.node.Key]; ok { - lc.lastHandshake = peerStatus.LastHandshake + if lc.node != nil { + if peerStatus, ok := status.Peer[lc.node.Key]; ok { + lc.lastHandshake = peerStatus.LastHandshake + } + logger = logger.With(slog.F("key_id", lc.node.Key.ShortString())) } - logger = logger.With(slog.F("key_id", lc.node.Key.ShortString())) if !lc.lost { logger.Debug(context.Background(), "timeout triggered for peer that is no longer lost") @@ -522,7 +573,7 @@ func (c *configMaps) nodeAddresses(publicKey key.NodePublic) ([]netip.Prefix, bo c.L.Lock() defer c.L.Unlock() for _, lc := range c.peers { - if lc.node.Key == publicKey { + if lc.node != nil && lc.node.Key == publicKey { return lc.node.Addresses, true } } @@ -539,9 +590,10 @@ func (c *configMaps) fillPeerDiagnostics(d *PeerDiagnostics, peerID uuid.UUID) { } } lc, ok := c.peers[peerID] - if !ok { + if !ok || lc.node == nil { return } + d.ReceivedNode = lc.node ps, ok := status.Peer[lc.node.Key] if !ok { @@ -550,34 +602,102 @@ func (c *configMaps) fillPeerDiagnostics(d *PeerDiagnostics, peerID uuid.UUID) { d.LastWireguardHandshake = ps.LastHandshake } -type peerLifecycle struct { - peerID uuid.UUID - node *tailcfg.Node - lost bool - lastHandshake time.Time - timer *clock.Timer +func (c *configMaps) peerReadyForHandshakeTimeout(peerID uuid.UUID) { + logger := c.logger.With(slog.F("peer_id", peerID)) + logger.Debug(context.Background(), "peer ready for handshake timeout") + c.L.Lock() + defer c.L.Unlock() + lc, ok := c.peers[peerID] + if !ok { + logger.Debug(context.Background(), + "ready for handshake timeout triggered for peer that is removed from the map") + return + } + + wasReady := lc.readyForHandshake + lc.readyForHandshake = true + if !wasReady { + logger.Info(context.Background(), "setting peer ready for handshake after timeout") + c.netmapDirty = true + c.Broadcast() + } } -func (l *peerLifecycle) resetTimer() { - if l.timer != nil { - l.timer.Stop() - l.timer = nil +func (*configMaps) nodeKeepalive(lc *peerLifecycle, status *ipnstate.Status, node *tailcfg.Node) bool { + // If the peer is already active, keepalives should be enabled. + if peerStatus, statusOk := status.Peer[node.Key]; statusOk && peerStatus.Active { + return true + } + // If the peer is a destination, we should only enable keepalives if we've + // received the READY_FOR_HANDSHAKE. + if lc != nil && lc.isDestination && lc.readyForHandshake { + return true + } + + // If none of the above are true, keepalives should not be enabled. + return false +} + +type peerLifecycle struct { + peerID uuid.UUID + // isDestination specifies if the peer is a destination, meaning we + // initiated a tunnel to the peer. When the peer is a destination, we do not + // respond to node updates with `READY_FOR_HANDSHAKE`s, and we wait to + // program the peer into wireguard until we receive a READY_FOR_HANDSHAKE + // from the peer or the timeout is reached. + isDestination bool + // node is the tailcfg.Node for the peer. It may be nil until we receive a + // NODE update for it. + node *tailcfg.Node + lost bool + lastHandshake time.Time + lostTimer *clock.Timer + readyForHandshake bool + readyForHandshakeTimer *clock.Timer +} + +func (l *peerLifecycle) resetLostTimer() { + if l.lostTimer != nil { + l.lostTimer.Stop() + l.lostTimer = nil } } func (l *peerLifecycle) setLostTimer(c *configMaps) { - if l.timer != nil { - l.timer.Stop() + if l.lostTimer != nil { + l.lostTimer.Stop() } ttl := lostTimeout - c.clock.Since(l.lastHandshake) if ttl <= 0 { ttl = time.Nanosecond } - l.timer = c.clock.AfterFunc(ttl, func() { + l.lostTimer = c.clock.AfterFunc(ttl, func() { c.peerLostTimeout(l.peerID) }) } +const readyForHandshakeTimeout = 5 * time.Second + +func (l *peerLifecycle) setReadyForHandshakeTimer(c *configMaps) { + if l.readyForHandshakeTimer != nil { + l.readyForHandshakeTimer.Stop() + } + l.readyForHandshakeTimer = c.clock.AfterFunc(readyForHandshakeTimeout, func() { + c.logger.Debug(context.Background(), "ready for handshake timeout", slog.F("peer_id", l.peerID)) + c.peerReadyForHandshakeTimeout(l.peerID) + }) +} + +// validForWireguard returns true if the peer is ready to be programmed into +// wireguard. +func (l *peerLifecycle) validForWireguard() bool { + valid := l.node != nil + if l.isDestination { + return valid && l.readyForHandshake + } + return valid +} + // prefixesDifferent returns true if the two slices contain different prefixes // where order doesn't matter. func prefixesDifferent(a, b []netip.Prefix) bool { diff --git a/tailnet/configmaps_internal_test.go b/tailnet/configmaps_internal_test.go index 1008562904..49171ecf03 100644 --- a/tailnet/configmaps_internal_test.go +++ b/tailnet/configmaps_internal_test.go @@ -185,6 +185,258 @@ func TestConfigMaps_updatePeers_new(t *testing.T) { _ = testutil.RequireRecvCtx(ctx, t, done) } +func TestConfigMaps_updatePeers_new_waitForHandshake_neverConfigures(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + fEng := newFakeEngineConfigurable() + nodePrivateKey := key.NewNode() + nodeID := tailcfg.NodeID(5) + discoKey := key.NewDisco() + uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public()) + defer uut.close() + start := time.Date(2024, time.March, 29, 8, 0, 0, 0, time.UTC) + mClock := clock.NewMock() + mClock.Set(start) + uut.clock = mClock + + p1ID := uuid.UUID{1} + p1Node := newTestNode(1) + p1n, err := NodeToProto(p1Node) + require.NoError(t, err) + uut.setTunnelDestination(p1ID) + + // it should not send the peer to the netmap + requireNeverConfigures(ctx, t, &uut.phased) + + go func() { + <-fEng.status + fEng.statusDone <- struct{}{} + }() + + u1 := []*proto.CoordinateResponse_PeerUpdate{ + { + Id: p1ID[:], + Kind: proto.CoordinateResponse_PeerUpdate_NODE, + Node: p1n, + }, + } + uut.updatePeers(u1) + + done := make(chan struct{}) + go func() { + defer close(done) + uut.close() + }() + _ = testutil.RequireRecvCtx(ctx, t, done) +} + +func TestConfigMaps_updatePeers_new_waitForHandshake_outOfOrder(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + fEng := newFakeEngineConfigurable() + nodePrivateKey := key.NewNode() + nodeID := tailcfg.NodeID(5) + discoKey := key.NewDisco() + uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public()) + defer uut.close() + start := time.Date(2024, time.March, 29, 8, 0, 0, 0, time.UTC) + mClock := clock.NewMock() + mClock.Set(start) + uut.clock = mClock + + p1ID := uuid.UUID{1} + p1Node := newTestNode(1) + p1n, err := NodeToProto(p1Node) + require.NoError(t, err) + uut.setTunnelDestination(p1ID) + + go func() { + <-fEng.status + fEng.statusDone <- struct{}{} + }() + + u2 := []*proto.CoordinateResponse_PeerUpdate{ + { + Id: p1ID[:], + Kind: proto.CoordinateResponse_PeerUpdate_READY_FOR_HANDSHAKE, + }, + } + uut.updatePeers(u2) + + // it should not send the peer to the netmap yet + + go func() { + <-fEng.status + fEng.statusDone <- struct{}{} + }() + + u1 := []*proto.CoordinateResponse_PeerUpdate{ + { + Id: p1ID[:], + Kind: proto.CoordinateResponse_PeerUpdate_NODE, + Node: p1n, + }, + } + uut.updatePeers(u1) + + // it should now send the peer to the netmap + + nm := testutil.RequireRecvCtx(ctx, t, fEng.setNetworkMap) + r := testutil.RequireRecvCtx(ctx, t, fEng.reconfig) + + require.Len(t, nm.Peers, 1) + n1 := getNodeWithID(t, nm.Peers, 1) + require.Equal(t, "127.3.3.40:1", n1.DERP) + require.Equal(t, p1Node.Endpoints, n1.Endpoints) + require.True(t, n1.KeepAlive) + + // we rely on nmcfg.WGCfg() to convert the netmap to wireguard config, so just + // require the right number of peers. + require.Len(t, r.wg.Peers, 1) + + done := make(chan struct{}) + go func() { + defer close(done) + uut.close() + }() + _ = testutil.RequireRecvCtx(ctx, t, done) +} + +func TestConfigMaps_updatePeers_new_waitForHandshake(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + fEng := newFakeEngineConfigurable() + nodePrivateKey := key.NewNode() + nodeID := tailcfg.NodeID(5) + discoKey := key.NewDisco() + uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public()) + defer uut.close() + start := time.Date(2024, time.March, 29, 8, 0, 0, 0, time.UTC) + mClock := clock.NewMock() + mClock.Set(start) + uut.clock = mClock + + p1ID := uuid.UUID{1} + p1Node := newTestNode(1) + p1n, err := NodeToProto(p1Node) + require.NoError(t, err) + uut.setTunnelDestination(p1ID) + + go func() { + <-fEng.status + fEng.statusDone <- struct{}{} + }() + + u1 := []*proto.CoordinateResponse_PeerUpdate{ + { + Id: p1ID[:], + Kind: proto.CoordinateResponse_PeerUpdate_NODE, + Node: p1n, + }, + } + uut.updatePeers(u1) + + // it should not send the peer to the netmap yet + + go func() { + <-fEng.status + fEng.statusDone <- struct{}{} + }() + + u2 := []*proto.CoordinateResponse_PeerUpdate{ + { + Id: p1ID[:], + Kind: proto.CoordinateResponse_PeerUpdate_READY_FOR_HANDSHAKE, + }, + } + uut.updatePeers(u2) + + // it should now send the peer to the netmap + + nm := testutil.RequireRecvCtx(ctx, t, fEng.setNetworkMap) + r := testutil.RequireRecvCtx(ctx, t, fEng.reconfig) + + require.Len(t, nm.Peers, 1) + n1 := getNodeWithID(t, nm.Peers, 1) + require.Equal(t, "127.3.3.40:1", n1.DERP) + require.Equal(t, p1Node.Endpoints, n1.Endpoints) + require.True(t, n1.KeepAlive) + + // we rely on nmcfg.WGCfg() to convert the netmap to wireguard config, so just + // require the right number of peers. + require.Len(t, r.wg.Peers, 1) + + done := make(chan struct{}) + go func() { + defer close(done) + uut.close() + }() + _ = testutil.RequireRecvCtx(ctx, t, done) +} + +func TestConfigMaps_updatePeers_new_waitForHandshake_timeout(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + fEng := newFakeEngineConfigurable() + nodePrivateKey := key.NewNode() + nodeID := tailcfg.NodeID(5) + discoKey := key.NewDisco() + uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public()) + defer uut.close() + start := time.Date(2024, time.March, 29, 8, 0, 0, 0, time.UTC) + mClock := clock.NewMock() + mClock.Set(start) + uut.clock = mClock + + p1ID := uuid.UUID{1} + p1Node := newTestNode(1) + p1n, err := NodeToProto(p1Node) + require.NoError(t, err) + uut.setTunnelDestination(p1ID) + + go func() { + <-fEng.status + fEng.statusDone <- struct{}{} + }() + + u1 := []*proto.CoordinateResponse_PeerUpdate{ + { + Id: p1ID[:], + Kind: proto.CoordinateResponse_PeerUpdate_NODE, + Node: p1n, + }, + } + uut.updatePeers(u1) + + mClock.Add(5 * time.Second) + + // it should now send the peer to the netmap + + nm := testutil.RequireRecvCtx(ctx, t, fEng.setNetworkMap) + r := testutil.RequireRecvCtx(ctx, t, fEng.reconfig) + + require.Len(t, nm.Peers, 1) + n1 := getNodeWithID(t, nm.Peers, 1) + require.Equal(t, "127.3.3.40:1", n1.DERP) + require.Equal(t, p1Node.Endpoints, n1.Endpoints) + require.False(t, n1.KeepAlive) + + // we rely on nmcfg.WGCfg() to convert the netmap to wireguard config, so just + // require the right number of peers. + require.Len(t, r.wg.Peers, 1) + + done := make(chan struct{}) + go func() { + defer close(done) + uut.close() + }() + _ = testutil.RequireRecvCtx(ctx, t, done) +} + func TestConfigMaps_updatePeers_same(t *testing.T) { t.Parallel() ctx := testutil.Context(t, testutil.WaitShort) @@ -274,7 +526,7 @@ func TestConfigMaps_updatePeers_disconnect(t *testing.T) { peerID: p1ID, node: p1tcn, lastHandshake: time.Date(2024, 1, 7, 12, 0, 10, 0, time.UTC), - timer: timer, + lostTimer: timer, } uut.L.Unlock() @@ -947,6 +1199,7 @@ func requireNeverConfigures(ctx context.Context, t *testing.T, uut *phased) { t.Helper() waiting := make(chan struct{}) go func() { + t.Helper() // ensure that we never configure, and go straight to closed uut.L.Lock() defer uut.L.Unlock() diff --git a/tailnet/conn.go b/tailnet/conn.go index e6dbdfdc38..d4d58c7cc9 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -88,7 +88,6 @@ type Options struct { // falling back. This is useful for misbehaving proxies that prevent // fallback due to odd behavior, like Azure App Proxy. DERPForceWebSockets bool - // BlockEndpoints specifies whether P2P endpoints are blocked. // If so, only DERPs can establish connections. BlockEndpoints bool @@ -311,6 +310,10 @@ type Conn struct { trafficStats *connstats.Statistics } +func (c *Conn) SetTunnelDestination(id uuid.UUID) { + c.configMaps.setTunnelDestination(id) +} + func (c *Conn) GetBlockEndpoints() bool { return c.configMaps.getBlockEndpoints() && c.nodeUpdater.getBlockEndpoints() } diff --git a/tailnet/coordinator.go b/tailnet/coordinator.go index ce9c8e99b2..95f61637f7 100644 --- a/tailnet/coordinator.go +++ b/tailnet/coordinator.go @@ -99,6 +99,9 @@ type Coordinatee interface { UpdatePeers([]*proto.CoordinateResponse_PeerUpdate) error SetAllPeersLost() SetNodeCallback(func(*Node)) + // SetTunnelDestination indicates to tailnet that the peer id is a + // destination. + SetTunnelDestination(id uuid.UUID) } type Coordination interface { @@ -111,6 +114,7 @@ type remoteCoordination struct { closed bool errChan chan error coordinatee Coordinatee + tgt uuid.UUID logger slog.Logger protocol proto.DRPCTailnet_CoordinateClient respLoopDone chan struct{} @@ -161,11 +165,37 @@ func (c *remoteCoordination) respLoop() { c.sendErr(xerrors.Errorf("read: %w", err)) return } + err = c.coordinatee.UpdatePeers(resp.GetPeerUpdates()) if err != nil { c.sendErr(xerrors.Errorf("update peers: %w", err)) return } + + // Only send acks from peers without a target. + if c.tgt == uuid.Nil { + // Send an ack back for all received peers. This could + // potentially be smarter to only send an ACK once per client, + // but there's nothing currently stopping clients from reusing + // IDs. + rfh := []*proto.CoordinateRequest_ReadyForHandshake{} + for _, peer := range resp.GetPeerUpdates() { + if peer.Kind != proto.CoordinateResponse_PeerUpdate_NODE { + continue + } + + rfh = append(rfh, &proto.CoordinateRequest_ReadyForHandshake{Id: peer.Id}) + } + if len(rfh) > 0 { + err := c.protocol.Send(&proto.CoordinateRequest{ + ReadyForHandshake: rfh, + }) + if err != nil { + c.sendErr(xerrors.Errorf("send: %w", err)) + return + } + } + } } } @@ -179,11 +209,14 @@ func NewRemoteCoordination(logger slog.Logger, c := &remoteCoordination{ errChan: make(chan error, 1), coordinatee: coordinatee, + tgt: tunnelTarget, logger: logger, protocol: protocol, respLoopDone: make(chan struct{}), } if tunnelTarget != uuid.Nil { + // TODO: reenable in upstack PR + // c.coordinatee.SetTunnelDestination(tunnelTarget) c.Lock() err := c.protocol.Send(&proto.CoordinateRequest{AddTunnel: &proto.CoordinateRequest_Tunnel{Id: tunnelTarget[:]}}) c.Unlock() @@ -327,6 +360,13 @@ func (c *inMemoryCoordination) respLoop() { } } +func (*inMemoryCoordination) AwaitAck() <-chan struct{} { + // This is only used for tests, so just return a closed channel. + ch := make(chan struct{}) + close(ch) + return ch +} + func (c *inMemoryCoordination) Close() error { c.Lock() defer c.Unlock() @@ -658,6 +698,54 @@ func (c *core) handleRequest(p *peer, req *proto.CoordinateRequest) error { if req.Disconnect != nil { c.removePeerLocked(p.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "graceful disconnect") } + if rfhs := req.ReadyForHandshake; rfhs != nil { + err := c.handleReadyForHandshakeLocked(pr, rfhs) + if err != nil { + return xerrors.Errorf("handle ack: %w", err) + } + } + return nil +} + +func (c *core) handleReadyForHandshakeLocked(src *peer, rfhs []*proto.CoordinateRequest_ReadyForHandshake) error { + for _, rfh := range rfhs { + dstID, err := uuid.FromBytes(rfh.Id) + if err != nil { + // this shouldn't happen unless there is a client error. Close the connection so the client + // doesn't just happily continue thinking everything is fine. + return xerrors.Errorf("unable to convert bytes to UUID: %w", err) + } + + if !c.tunnels.tunnelExists(src.id, dstID) { + // We intentionally do not return an error here, since it's + // inherently racy. It's possible for a source to connect, then + // subsequently disconnect before the agent has sent back the RFH. + // Since this could potentially happen to a non-malicious agent, we + // don't want to kill its connection. + select { + case src.resps <- &proto.CoordinateResponse{ + Error: fmt.Sprintf("you do not share a tunnel with %q", dstID.String()), + }: + default: + return ErrWouldBlock + } + continue + } + + dst, ok := c.peers[dstID] + if ok { + select { + case dst.resps <- &proto.CoordinateResponse{ + PeerUpdates: []*proto.CoordinateResponse_PeerUpdate{{ + Id: src.id[:], + Kind: proto.CoordinateResponse_PeerUpdate_READY_FOR_HANDSHAKE, + }}, + }: + default: + return ErrWouldBlock + } + } + } return nil } diff --git a/tailnet/coordinator_test.go b/tailnet/coordinator_test.go index d8a6f297b5..c4e269c53c 100644 --- a/tailnet/coordinator_test.go +++ b/tailnet/coordinator_test.go @@ -412,6 +412,68 @@ func TestCoordinator(t *testing.T) { _ = testutil.RequireRecvCtx(ctx, t, clientErrChan) _ = testutil.RequireRecvCtx(ctx, t, closeClientChan) }) + + t.Run("AgentAck", func(t *testing.T) { + t.Parallel() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + coordinator := tailnet.NewCoordinator(logger) + ctx := testutil.Context(t, testutil.WaitShort) + + clientID := uuid.New() + agentID := uuid.New() + + aReq, aRes := coordinator.Coordinate(ctx, agentID, agentID.String(), tailnet.AgentCoordinateeAuth{ID: agentID}) + cReq, cRes := coordinator.Coordinate(ctx, clientID, clientID.String(), tailnet.ClientCoordinateeAuth{AgentID: agentID}) + + { + nk, err := key.NewNode().Public().MarshalBinary() + require.NoError(t, err) + dk, err := key.NewDisco().Public().MarshalText() + require.NoError(t, err) + cReq <- &proto.CoordinateRequest{UpdateSelf: &proto.CoordinateRequest_UpdateSelf{ + Node: &proto.Node{ + Id: 3, + Key: nk, + Disco: string(dk), + }, + }} + } + + cReq <- &proto.CoordinateRequest{AddTunnel: &proto.CoordinateRequest_Tunnel{ + Id: agentID[:], + }} + + testutil.RequireRecvCtx(ctx, t, aRes) + + aReq <- &proto.CoordinateRequest{ReadyForHandshake: []*proto.CoordinateRequest_ReadyForHandshake{{ + Id: clientID[:], + }}} + ack := testutil.RequireRecvCtx(ctx, t, cRes) + require.NotNil(t, ack.PeerUpdates) + require.Len(t, ack.PeerUpdates, 1) + require.Equal(t, proto.CoordinateResponse_PeerUpdate_READY_FOR_HANDSHAKE, ack.PeerUpdates[0].Kind) + require.Equal(t, agentID[:], ack.PeerUpdates[0].Id) + }) + + t.Run("AgentAck_NoPermission", func(t *testing.T) { + t.Parallel() + logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug) + coordinator := tailnet.NewCoordinator(logger) + ctx := testutil.Context(t, testutil.WaitShort) + + clientID := uuid.New() + agentID := uuid.New() + + aReq, aRes := coordinator.Coordinate(ctx, agentID, agentID.String(), tailnet.AgentCoordinateeAuth{ID: agentID}) + _, _ = coordinator.Coordinate(ctx, clientID, clientID.String(), tailnet.ClientCoordinateeAuth{AgentID: agentID}) + + aReq <- &proto.CoordinateRequest{ReadyForHandshake: []*proto.CoordinateRequest_ReadyForHandshake{{ + Id: clientID[:], + }}} + + rfhError := testutil.RequireRecvCtx(ctx, t, aRes) + require.NotEmpty(t, rfhError.Error) + }) } // TestCoordinator_AgentUpdateWhileClientConnects tests for regression on @@ -638,6 +700,76 @@ func TestRemoteCoordination(t *testing.T) { } } +func TestRemoteCoordination_SendsReadyForHandshake(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + clientID := uuid.UUID{1} + agentID := uuid.UUID{2} + mCoord := tailnettest.NewMockCoordinator(gomock.NewController(t)) + fConn := &fakeCoordinatee{} + + reqs := make(chan *proto.CoordinateRequest, 100) + resps := make(chan *proto.CoordinateResponse, 100) + mCoord.EXPECT().Coordinate(gomock.Any(), clientID, gomock.Any(), tailnet.ClientCoordinateeAuth{agentID}). + Times(1).Return(reqs, resps) + + var coord tailnet.Coordinator = mCoord + coordPtr := atomic.Pointer[tailnet.Coordinator]{} + coordPtr.Store(&coord) + svc, err := tailnet.NewClientService( + logger.Named("svc"), &coordPtr, + time.Hour, + func() *tailcfg.DERPMap { panic("not implemented") }, + ) + require.NoError(t, err) + sC, cC := net.Pipe() + + serveErr := make(chan error, 1) + go func() { + err := svc.ServeClient(ctx, proto.CurrentVersion.String(), sC, clientID, agentID) + serveErr <- err + }() + + client, err := tailnet.NewDRPCClient(cC, logger) + require.NoError(t, err) + protocol, err := client.Coordinate(ctx) + require.NoError(t, err) + + uut := tailnet.NewRemoteCoordination(logger.Named("coordination"), protocol, fConn, uuid.UUID{}) + defer uut.Close() + + nk, err := key.NewNode().Public().MarshalBinary() + require.NoError(t, err) + dk, err := key.NewDisco().Public().MarshalText() + require.NoError(t, err) + testutil.RequireSendCtx(ctx, t, resps, &proto.CoordinateResponse{ + PeerUpdates: []*proto.CoordinateResponse_PeerUpdate{{ + Id: clientID[:], + Kind: proto.CoordinateResponse_PeerUpdate_NODE, + Node: &proto.Node{ + Id: 3, + Key: nk, + Disco: string(dk), + }, + }}, + }) + + rfh := testutil.RequireRecvCtx(ctx, t, reqs) + require.NotNil(t, rfh.ReadyForHandshake) + require.Len(t, rfh.ReadyForHandshake, 1) + require.Equal(t, clientID[:], rfh.ReadyForHandshake[0].Id) + + require.NoError(t, uut.Close()) + + select { + case err := <-uut.Error(): + require.ErrorContains(t, err, "stream terminated by sending close") + default: + // OK! + } +} + // coordinationTest tests that a coordination behaves correctly func coordinationTest( ctx context.Context, t *testing.T, @@ -698,6 +830,7 @@ type fakeCoordinatee struct { callback func(*tailnet.Node) updates [][]*proto.CoordinateResponse_PeerUpdate setAllPeersLostCalls int + tunnelDestinations map[uuid.UUID]struct{} } func (f *fakeCoordinatee) UpdatePeers(updates []*proto.CoordinateResponse_PeerUpdate) error { @@ -713,6 +846,16 @@ func (f *fakeCoordinatee) SetAllPeersLost() { f.setAllPeersLostCalls++ } +func (f *fakeCoordinatee) SetTunnelDestination(id uuid.UUID) { + f.Lock() + defer f.Unlock() + + if f.tunnelDestinations == nil { + f.tunnelDestinations = map[uuid.UUID]struct{}{} + } + f.tunnelDestinations[id] = struct{}{} +} + func (f *fakeCoordinatee) SetNodeCallback(callback func(*tailnet.Node)) { f.Lock() defer f.Unlock() diff --git a/tailnet/proto/tailnet.pb.go b/tailnet/proto/tailnet.pb.go index 63444f2173..5f623cf2b8 100644 --- a/tailnet/proto/tailnet.pb.go +++ b/tailnet/proto/tailnet.pb.go @@ -24,10 +24,11 @@ const ( type CoordinateResponse_PeerUpdate_Kind int32 const ( - CoordinateResponse_PeerUpdate_KIND_UNSPECIFIED CoordinateResponse_PeerUpdate_Kind = 0 - CoordinateResponse_PeerUpdate_NODE CoordinateResponse_PeerUpdate_Kind = 1 - CoordinateResponse_PeerUpdate_DISCONNECTED CoordinateResponse_PeerUpdate_Kind = 2 - CoordinateResponse_PeerUpdate_LOST CoordinateResponse_PeerUpdate_Kind = 3 + CoordinateResponse_PeerUpdate_KIND_UNSPECIFIED CoordinateResponse_PeerUpdate_Kind = 0 + CoordinateResponse_PeerUpdate_NODE CoordinateResponse_PeerUpdate_Kind = 1 + CoordinateResponse_PeerUpdate_DISCONNECTED CoordinateResponse_PeerUpdate_Kind = 2 + CoordinateResponse_PeerUpdate_LOST CoordinateResponse_PeerUpdate_Kind = 3 + CoordinateResponse_PeerUpdate_READY_FOR_HANDSHAKE CoordinateResponse_PeerUpdate_Kind = 4 ) // Enum value maps for CoordinateResponse_PeerUpdate_Kind. @@ -37,12 +38,14 @@ var ( 1: "NODE", 2: "DISCONNECTED", 3: "LOST", + 4: "READY_FOR_HANDSHAKE", } CoordinateResponse_PeerUpdate_Kind_value = map[string]int32{ - "KIND_UNSPECIFIED": 0, - "NODE": 1, - "DISCONNECTED": 2, - "LOST": 3, + "KIND_UNSPECIFIED": 0, + "NODE": 1, + "DISCONNECTED": 2, + "LOST": 3, + "READY_FOR_HANDSHAKE": 4, } ) @@ -291,10 +294,11 @@ type CoordinateRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - UpdateSelf *CoordinateRequest_UpdateSelf `protobuf:"bytes,1,opt,name=update_self,json=updateSelf,proto3" json:"update_self,omitempty"` - Disconnect *CoordinateRequest_Disconnect `protobuf:"bytes,2,opt,name=disconnect,proto3" json:"disconnect,omitempty"` - AddTunnel *CoordinateRequest_Tunnel `protobuf:"bytes,3,opt,name=add_tunnel,json=addTunnel,proto3" json:"add_tunnel,omitempty"` - RemoveTunnel *CoordinateRequest_Tunnel `protobuf:"bytes,4,opt,name=remove_tunnel,json=removeTunnel,proto3" json:"remove_tunnel,omitempty"` + UpdateSelf *CoordinateRequest_UpdateSelf `protobuf:"bytes,1,opt,name=update_self,json=updateSelf,proto3" json:"update_self,omitempty"` + Disconnect *CoordinateRequest_Disconnect `protobuf:"bytes,2,opt,name=disconnect,proto3" json:"disconnect,omitempty"` + AddTunnel *CoordinateRequest_Tunnel `protobuf:"bytes,3,opt,name=add_tunnel,json=addTunnel,proto3" json:"add_tunnel,omitempty"` + RemoveTunnel *CoordinateRequest_Tunnel `protobuf:"bytes,4,opt,name=remove_tunnel,json=removeTunnel,proto3" json:"remove_tunnel,omitempty"` + ReadyForHandshake []*CoordinateRequest_ReadyForHandshake `protobuf:"bytes,5,rep,name=ready_for_handshake,json=readyForHandshake,proto3" json:"ready_for_handshake,omitempty"` } func (x *CoordinateRequest) Reset() { @@ -357,12 +361,20 @@ func (x *CoordinateRequest) GetRemoveTunnel() *CoordinateRequest_Tunnel { return nil } +func (x *CoordinateRequest) GetReadyForHandshake() []*CoordinateRequest_ReadyForHandshake { + if x != nil { + return x.ReadyForHandshake + } + return nil +} + type CoordinateResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields PeerUpdates []*CoordinateResponse_PeerUpdate `protobuf:"bytes,1,rep,name=peer_updates,json=peerUpdates,proto3" json:"peer_updates,omitempty"` + Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` } func (x *CoordinateResponse) Reset() { @@ -404,6 +416,13 @@ func (x *CoordinateResponse) GetPeerUpdates() []*CoordinateResponse_PeerUpdate { return nil } +func (x *CoordinateResponse) GetError() string { + if x != nil { + return x.Error + } + return "" +} + type DERPMap_HomeParams struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -813,6 +832,57 @@ func (x *CoordinateRequest_Tunnel) GetId() []byte { return nil } +// ReadyForHandskales are sent from destinations back to the source, +// acknowledging receipt of the source's node. If the source starts pinging +// before a ReadyForHandshake, the Wireguard handshake will likely be +// dropped. +type CoordinateRequest_ReadyForHandshake struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id []byte `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *CoordinateRequest_ReadyForHandshake) Reset() { + *x = CoordinateRequest_ReadyForHandshake{} + if protoimpl.UnsafeEnabled { + mi := &file_tailnet_proto_tailnet_proto_msgTypes[15] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CoordinateRequest_ReadyForHandshake) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CoordinateRequest_ReadyForHandshake) ProtoMessage() {} + +func (x *CoordinateRequest_ReadyForHandshake) ProtoReflect() protoreflect.Message { + mi := &file_tailnet_proto_tailnet_proto_msgTypes[15] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CoordinateRequest_ReadyForHandshake.ProtoReflect.Descriptor instead. +func (*CoordinateRequest_ReadyForHandshake) Descriptor() ([]byte, []int) { + return file_tailnet_proto_tailnet_proto_rawDescGZIP(), []int{3, 3} +} + +func (x *CoordinateRequest_ReadyForHandshake) GetId() []byte { + if x != nil { + return x.Id + } + return nil +} + type CoordinateResponse_PeerUpdate struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -827,7 +897,7 @@ type CoordinateResponse_PeerUpdate struct { func (x *CoordinateResponse_PeerUpdate) Reset() { *x = CoordinateResponse_PeerUpdate{} if protoimpl.UnsafeEnabled { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[15] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[16] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -840,7 +910,7 @@ func (x *CoordinateResponse_PeerUpdate) String() string { func (*CoordinateResponse_PeerUpdate) ProtoMessage() {} func (x *CoordinateResponse_PeerUpdate) ProtoReflect() protoreflect.Message { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[15] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[16] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -992,7 +1062,7 @@ var file_tailnet_proto_tailnet_proto_rawDesc = []byte{ 0x57, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xb2, 0x03, 0x0a, 0x11, 0x43, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xbe, 0x04, 0x0a, 0x11, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x4f, 0x0a, 0x0b, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x5f, 0x73, 0x65, 0x6c, 0x66, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2e, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, @@ -1013,50 +1083,62 @@ var file_tailnet_proto_tailnet_proto_rawDesc = []byte{ 0x01, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x54, 0x75, 0x6e, 0x6e, 0x65, 0x6c, 0x52, - 0x0c, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x54, 0x75, 0x6e, 0x6e, 0x65, 0x6c, 0x1a, 0x38, 0x0a, - 0x0a, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x53, 0x65, 0x6c, 0x66, 0x12, 0x2a, 0x0a, 0x04, 0x6e, - 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x6f, 0x64, 0x65, - 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x4e, 0x6f, 0x64, - 0x65, 0x52, 0x04, 0x6e, 0x6f, 0x64, 0x65, 0x1a, 0x0c, 0x0a, 0x0a, 0x44, 0x69, 0x73, 0x63, 0x6f, - 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x1a, 0x18, 0x0a, 0x06, 0x54, 0x75, 0x6e, 0x6e, 0x65, 0x6c, 0x12, - 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x02, 0x69, 0x64, 0x22, - 0xd9, 0x02, 0x0a, 0x12, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x52, 0x0a, 0x0c, 0x70, 0x65, 0x65, 0x72, 0x5f, 0x75, - 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2f, 0x2e, 0x63, - 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, + 0x0c, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x54, 0x75, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x65, 0x0a, + 0x13, 0x72, 0x65, 0x61, 0x64, 0x79, 0x5f, 0x66, 0x6f, 0x72, 0x5f, 0x68, 0x61, 0x6e, 0x64, 0x73, + 0x68, 0x61, 0x6b, 0x65, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x35, 0x2e, 0x63, 0x6f, 0x64, + 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x43, 0x6f, + 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, + 0x52, 0x65, 0x61, 0x64, 0x79, 0x46, 0x6f, 0x72, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, + 0x65, 0x52, 0x11, 0x72, 0x65, 0x61, 0x64, 0x79, 0x46, 0x6f, 0x72, 0x48, 0x61, 0x6e, 0x64, 0x73, + 0x68, 0x61, 0x6b, 0x65, 0x1a, 0x38, 0x0a, 0x0a, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x53, 0x65, + 0x6c, 0x66, 0x12, 0x2a, 0x0a, 0x04, 0x6e, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x16, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, + 0x2e, 0x76, 0x32, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x04, 0x6e, 0x6f, 0x64, 0x65, 0x1a, 0x0c, + 0x0a, 0x0a, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x1a, 0x18, 0x0a, 0x06, + 0x54, 0x75, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x02, 0x69, 0x64, 0x1a, 0x23, 0x0a, 0x11, 0x52, 0x65, 0x61, 0x64, 0x79, 0x46, + 0x6f, 0x72, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x02, 0x69, 0x64, 0x22, 0x88, 0x03, 0x0a, 0x12, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x0b, 0x70, - 0x65, 0x65, 0x72, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x1a, 0xee, 0x01, 0x0a, 0x0a, 0x50, - 0x65, 0x65, 0x72, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x02, 0x69, 0x64, 0x12, 0x2a, 0x0a, 0x04, 0x6e, 0x6f, 0x64, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, - 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x52, - 0x04, 0x6e, 0x6f, 0x64, 0x65, 0x12, 0x48, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x0e, 0x32, 0x34, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, - 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, - 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x55, 0x70, - 0x64, 0x61, 0x74, 0x65, 0x2e, 0x4b, 0x69, 0x6e, 0x64, 0x52, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x12, - 0x16, 0x0a, 0x06, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x06, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x22, 0x42, 0x0a, 0x04, 0x4b, 0x69, 0x6e, 0x64, 0x12, - 0x14, 0x0a, 0x10, 0x4b, 0x49, 0x4e, 0x44, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, - 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x4e, 0x4f, 0x44, 0x45, 0x10, 0x01, 0x12, - 0x10, 0x0a, 0x0c, 0x44, 0x49, 0x53, 0x43, 0x4f, 0x4e, 0x4e, 0x45, 0x43, 0x54, 0x45, 0x44, 0x10, - 0x02, 0x12, 0x08, 0x0a, 0x04, 0x4c, 0x4f, 0x53, 0x54, 0x10, 0x03, 0x32, 0xbe, 0x01, 0x0a, 0x07, - 0x54, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x12, 0x56, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x44, 0x45, 0x52, 0x50, 0x4d, 0x61, 0x70, 0x73, 0x12, 0x27, 0x2e, 0x63, 0x6f, 0x64, 0x65, - 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x53, 0x74, 0x72, - 0x65, 0x61, 0x6d, 0x44, 0x45, 0x52, 0x50, 0x4d, 0x61, 0x70, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, - 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x44, 0x45, 0x52, 0x50, 0x4d, 0x61, 0x70, 0x30, 0x01, 0x12, - 0x5b, 0x0a, 0x0a, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x12, 0x23, 0x2e, + 0x73, 0x65, 0x12, 0x52, 0x0a, 0x0c, 0x70, 0x65, 0x65, 0x72, 0x5f, 0x75, 0x70, 0x64, 0x61, 0x74, + 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2f, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, + 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x43, 0x6f, 0x6f, 0x72, + 0x64, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x50, + 0x65, 0x65, 0x72, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x0b, 0x70, 0x65, 0x65, 0x72, 0x55, + 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x1a, 0x87, 0x02, 0x0a, + 0x0a, 0x50, 0x65, 0x65, 0x72, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x02, 0x69, 0x64, 0x12, 0x2a, 0x0a, 0x04, 0x6e, + 0x6f, 0x64, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x6f, 0x64, 0x65, + 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x4e, 0x6f, 0x64, + 0x65, 0x52, 0x04, 0x6e, 0x6f, 0x64, 0x65, 0x12, 0x48, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x34, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, + 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, + 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x50, 0x65, 0x65, 0x72, + 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x2e, 0x4b, 0x69, 0x6e, 0x64, 0x52, 0x04, 0x6b, 0x69, 0x6e, + 0x64, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x06, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x22, 0x5b, 0x0a, 0x04, 0x4b, 0x69, 0x6e, + 0x64, 0x12, 0x14, 0x0a, 0x10, 0x4b, 0x49, 0x4e, 0x44, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, + 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x4e, 0x4f, 0x44, 0x45, 0x10, + 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x44, 0x49, 0x53, 0x43, 0x4f, 0x4e, 0x4e, 0x45, 0x43, 0x54, 0x45, + 0x44, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x4c, 0x4f, 0x53, 0x54, 0x10, 0x03, 0x12, 0x17, 0x0a, + 0x13, 0x52, 0x45, 0x41, 0x44, 0x59, 0x5f, 0x46, 0x4f, 0x52, 0x5f, 0x48, 0x41, 0x4e, 0x44, 0x53, + 0x48, 0x41, 0x4b, 0x45, 0x10, 0x04, 0x32, 0xbe, 0x01, 0x0a, 0x07, 0x54, 0x61, 0x69, 0x6c, 0x6e, + 0x65, 0x74, 0x12, 0x56, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x45, 0x52, 0x50, + 0x4d, 0x61, 0x70, 0x73, 0x12, 0x27, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, + 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x45, + 0x52, 0x50, 0x4d, 0x61, 0x70, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, - 0x2e, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, - 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x65, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x29, 0x5a, 0x27, - 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x64, 0x65, 0x72, - 0x2f, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x2f, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, - 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x2e, 0x44, 0x45, 0x52, 0x50, 0x4d, 0x61, 0x70, 0x30, 0x01, 0x12, 0x5b, 0x0a, 0x0a, 0x43, 0x6f, + 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x12, 0x23, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, + 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x43, 0x6f, 0x6f, 0x72, + 0x64, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, + 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, + 0x2e, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x63, 0x6f, 0x64, 0x65, + 0x72, 0x2f, 0x76, 0x32, 0x2f, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1072,7 +1154,7 @@ func file_tailnet_proto_tailnet_proto_rawDescGZIP() []byte { } var file_tailnet_proto_tailnet_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_tailnet_proto_tailnet_proto_msgTypes = make([]protoimpl.MessageInfo, 16) +var file_tailnet_proto_tailnet_proto_msgTypes = make([]protoimpl.MessageInfo, 17) var file_tailnet_proto_tailnet_proto_goTypes = []interface{}{ (CoordinateResponse_PeerUpdate_Kind)(0), // 0: coder.tailnet.v2.CoordinateResponse.PeerUpdate.Kind (*DERPMap)(nil), // 1: coder.tailnet.v2.DERPMap @@ -1090,35 +1172,37 @@ var file_tailnet_proto_tailnet_proto_goTypes = []interface{}{ (*CoordinateRequest_UpdateSelf)(nil), // 13: coder.tailnet.v2.CoordinateRequest.UpdateSelf (*CoordinateRequest_Disconnect)(nil), // 14: coder.tailnet.v2.CoordinateRequest.Disconnect (*CoordinateRequest_Tunnel)(nil), // 15: coder.tailnet.v2.CoordinateRequest.Tunnel - (*CoordinateResponse_PeerUpdate)(nil), // 16: coder.tailnet.v2.CoordinateResponse.PeerUpdate - (*timestamppb.Timestamp)(nil), // 17: google.protobuf.Timestamp + (*CoordinateRequest_ReadyForHandshake)(nil), // 16: coder.tailnet.v2.CoordinateRequest.ReadyForHandshake + (*CoordinateResponse_PeerUpdate)(nil), // 17: coder.tailnet.v2.CoordinateResponse.PeerUpdate + (*timestamppb.Timestamp)(nil), // 18: google.protobuf.Timestamp } var file_tailnet_proto_tailnet_proto_depIdxs = []int32{ 6, // 0: coder.tailnet.v2.DERPMap.home_params:type_name -> coder.tailnet.v2.DERPMap.HomeParams 8, // 1: coder.tailnet.v2.DERPMap.regions:type_name -> coder.tailnet.v2.DERPMap.RegionsEntry - 17, // 2: coder.tailnet.v2.Node.as_of:type_name -> google.protobuf.Timestamp + 18, // 2: coder.tailnet.v2.Node.as_of:type_name -> google.protobuf.Timestamp 11, // 3: coder.tailnet.v2.Node.derp_latency:type_name -> coder.tailnet.v2.Node.DerpLatencyEntry 12, // 4: coder.tailnet.v2.Node.derp_forced_websocket:type_name -> coder.tailnet.v2.Node.DerpForcedWebsocketEntry 13, // 5: coder.tailnet.v2.CoordinateRequest.update_self:type_name -> coder.tailnet.v2.CoordinateRequest.UpdateSelf 14, // 6: coder.tailnet.v2.CoordinateRequest.disconnect:type_name -> coder.tailnet.v2.CoordinateRequest.Disconnect 15, // 7: coder.tailnet.v2.CoordinateRequest.add_tunnel:type_name -> coder.tailnet.v2.CoordinateRequest.Tunnel 15, // 8: coder.tailnet.v2.CoordinateRequest.remove_tunnel:type_name -> coder.tailnet.v2.CoordinateRequest.Tunnel - 16, // 9: coder.tailnet.v2.CoordinateResponse.peer_updates:type_name -> coder.tailnet.v2.CoordinateResponse.PeerUpdate - 9, // 10: coder.tailnet.v2.DERPMap.HomeParams.region_score:type_name -> coder.tailnet.v2.DERPMap.HomeParams.RegionScoreEntry - 10, // 11: coder.tailnet.v2.DERPMap.Region.nodes:type_name -> coder.tailnet.v2.DERPMap.Region.Node - 7, // 12: coder.tailnet.v2.DERPMap.RegionsEntry.value:type_name -> coder.tailnet.v2.DERPMap.Region - 3, // 13: coder.tailnet.v2.CoordinateRequest.UpdateSelf.node:type_name -> coder.tailnet.v2.Node - 3, // 14: coder.tailnet.v2.CoordinateResponse.PeerUpdate.node:type_name -> coder.tailnet.v2.Node - 0, // 15: coder.tailnet.v2.CoordinateResponse.PeerUpdate.kind:type_name -> coder.tailnet.v2.CoordinateResponse.PeerUpdate.Kind - 2, // 16: coder.tailnet.v2.Tailnet.StreamDERPMaps:input_type -> coder.tailnet.v2.StreamDERPMapsRequest - 4, // 17: coder.tailnet.v2.Tailnet.Coordinate:input_type -> coder.tailnet.v2.CoordinateRequest - 1, // 18: coder.tailnet.v2.Tailnet.StreamDERPMaps:output_type -> coder.tailnet.v2.DERPMap - 5, // 19: coder.tailnet.v2.Tailnet.Coordinate:output_type -> coder.tailnet.v2.CoordinateResponse - 18, // [18:20] is the sub-list for method output_type - 16, // [16:18] is the sub-list for method input_type - 16, // [16:16] is the sub-list for extension type_name - 16, // [16:16] is the sub-list for extension extendee - 0, // [0:16] is the sub-list for field type_name + 16, // 9: coder.tailnet.v2.CoordinateRequest.ready_for_handshake:type_name -> coder.tailnet.v2.CoordinateRequest.ReadyForHandshake + 17, // 10: coder.tailnet.v2.CoordinateResponse.peer_updates:type_name -> coder.tailnet.v2.CoordinateResponse.PeerUpdate + 9, // 11: coder.tailnet.v2.DERPMap.HomeParams.region_score:type_name -> coder.tailnet.v2.DERPMap.HomeParams.RegionScoreEntry + 10, // 12: coder.tailnet.v2.DERPMap.Region.nodes:type_name -> coder.tailnet.v2.DERPMap.Region.Node + 7, // 13: coder.tailnet.v2.DERPMap.RegionsEntry.value:type_name -> coder.tailnet.v2.DERPMap.Region + 3, // 14: coder.tailnet.v2.CoordinateRequest.UpdateSelf.node:type_name -> coder.tailnet.v2.Node + 3, // 15: coder.tailnet.v2.CoordinateResponse.PeerUpdate.node:type_name -> coder.tailnet.v2.Node + 0, // 16: coder.tailnet.v2.CoordinateResponse.PeerUpdate.kind:type_name -> coder.tailnet.v2.CoordinateResponse.PeerUpdate.Kind + 2, // 17: coder.tailnet.v2.Tailnet.StreamDERPMaps:input_type -> coder.tailnet.v2.StreamDERPMapsRequest + 4, // 18: coder.tailnet.v2.Tailnet.Coordinate:input_type -> coder.tailnet.v2.CoordinateRequest + 1, // 19: coder.tailnet.v2.Tailnet.StreamDERPMaps:output_type -> coder.tailnet.v2.DERPMap + 5, // 20: coder.tailnet.v2.Tailnet.Coordinate:output_type -> coder.tailnet.v2.CoordinateResponse + 19, // [19:21] is the sub-list for method output_type + 17, // [17:19] is the sub-list for method input_type + 17, // [17:17] is the sub-list for extension type_name + 17, // [17:17] is the sub-list for extension extendee + 0, // [0:17] is the sub-list for field type_name } func init() { file_tailnet_proto_tailnet_proto_init() } @@ -1260,6 +1344,18 @@ func file_tailnet_proto_tailnet_proto_init() { } } file_tailnet_proto_tailnet_proto_msgTypes[15].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CoordinateRequest_ReadyForHandshake); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_tailnet_proto_tailnet_proto_msgTypes[16].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*CoordinateResponse_PeerUpdate); i { case 0: return &v.state @@ -1278,7 +1374,7 @@ func file_tailnet_proto_tailnet_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_tailnet_proto_tailnet_proto_rawDesc, NumEnums: 1, - NumMessages: 16, + NumMessages: 17, NumExtensions: 0, NumServices: 1, }, diff --git a/tailnet/proto/tailnet.proto b/tailnet/proto/tailnet.proto index 83445e7579..1e948ebac6 100644 --- a/tailnet/proto/tailnet.proto +++ b/tailnet/proto/tailnet.proto @@ -68,6 +68,15 @@ message CoordinateRequest { } Tunnel add_tunnel = 3; Tunnel remove_tunnel = 4; + + // ReadyForHandskales are sent from destinations back to the source, + // acknowledging receipt of the source's node. If the source starts pinging + // before a ReadyForHandshake, the Wireguard handshake will likely be + // dropped. + message ReadyForHandshake { + bytes id = 1; + } + repeated ReadyForHandshake ready_for_handshake = 5; } message CoordinateResponse { @@ -80,12 +89,14 @@ message CoordinateResponse { NODE = 1; DISCONNECTED = 2; LOST = 3; + READY_FOR_HANDSHAKE = 4; } Kind kind = 3; string reason = 4; } repeated PeerUpdate peer_updates = 1; + string error = 2; } service Tailnet { diff --git a/tailnet/tailnettest/coordinateemock.go b/tailnet/tailnettest/coordinateemock.go index 51f2dd2bce..c06243685a 100644 --- a/tailnet/tailnettest/coordinateemock.go +++ b/tailnet/tailnettest/coordinateemock.go @@ -14,6 +14,7 @@ import ( tailnet "github.com/coder/coder/v2/tailnet" proto "github.com/coder/coder/v2/tailnet/proto" + uuid "github.com/google/uuid" gomock "go.uber.org/mock/gomock" ) @@ -64,6 +65,18 @@ func (mr *MockCoordinateeMockRecorder) SetNodeCallback(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetNodeCallback", reflect.TypeOf((*MockCoordinatee)(nil).SetNodeCallback), arg0) } +// SetTunnelDestination mocks base method. +func (m *MockCoordinatee) SetTunnelDestination(arg0 uuid.UUID) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetTunnelDestination", arg0) +} + +// SetTunnelDestination indicates an expected call of SetTunnelDestination. +func (mr *MockCoordinateeMockRecorder) SetTunnelDestination(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTunnelDestination", reflect.TypeOf((*MockCoordinatee)(nil).SetTunnelDestination), arg0) +} + // UpdatePeers mocks base method. func (m *MockCoordinatee) UpdatePeers(arg0 []*proto.CoordinateResponse_PeerUpdate) error { m.ctrl.T.Helper() diff --git a/tailnet/tunnel.go b/tailnet/tunnel.go index bc5becbc94..68b78d4f92 100644 --- a/tailnet/tunnel.go +++ b/tailnet/tunnel.go @@ -52,6 +52,10 @@ func (c ClientCoordinateeAuth) Authorize(req *proto.CoordinateRequest) error { } } + if rfh := req.GetReadyForHandshake(); rfh != nil { + return xerrors.Errorf("clients may not send ready_for_handshake") + } + return nil } @@ -147,6 +151,12 @@ func (s *tunnelStore) findTunnelPeers(id uuid.UUID) []uuid.UUID { return out } +func (s *tunnelStore) tunnelExists(src, dst uuid.UUID) bool { + _, srcOK := s.bySrc[src][dst] + _, dstOK := s.byDst[src][dst] + return srcOK || dstOK +} + func (s *tunnelStore) htmlDebug() []HTMLTunnel { out := make([]HTMLTunnel, 0) for src, dsts := range s.bySrc { diff --git a/tailnet/tunnel_internal_test.go b/tailnet/tunnel_internal_test.go index 3ba7cc4165..b05871f086 100644 --- a/tailnet/tunnel_internal_test.go +++ b/tailnet/tunnel_internal_test.go @@ -43,3 +43,18 @@ func TestTunnelStore_RemoveAll(t *testing.T) { require.Len(t, uut.findTunnelPeers(p2), 0) require.Len(t, uut.findTunnelPeers(p3), 0) } + +func TestTunnelStore_TunnelExists(t *testing.T) { + t.Parallel() + p1 := uuid.UUID{1} + p2 := uuid.UUID{2} + uut := newTunnelStore() + require.False(t, uut.tunnelExists(p1, p2)) + require.False(t, uut.tunnelExists(p2, p1)) + uut.add(p1, p2) + require.True(t, uut.tunnelExists(p1, p2)) + require.True(t, uut.tunnelExists(p2, p1)) + uut.remove(p1, p2) + require.False(t, uut.tunnelExists(p1, p2)) + require.False(t, uut.tunnelExists(p2, p1)) +}