mirror of https://github.com/coder/coder.git
Merge 85bb13d142
into 93d8812284
This commit is contained in:
commit
89af5bdd8d
|
@ -498,10 +498,14 @@ func (c *configMaps) setAllPeersLost() {
|
|||
lc.setLostTimer(c)
|
||||
// it's important to drop a log here so that we see it get marked lost if grepping thru
|
||||
// the logs for a specific peer
|
||||
keyID := "(nil node)"
|
||||
if lc.node != nil {
|
||||
keyID = lc.node.Key.ShortString()
|
||||
}
|
||||
c.logger.Debug(context.Background(),
|
||||
"setAllPeersLost marked peer lost",
|
||||
slog.F("peer_id", lc.peerID),
|
||||
slog.F("key_id", lc.node.Key.ShortString()),
|
||||
slog.F("key_id", keyID),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -93,6 +93,9 @@ type Options struct {
|
|||
BlockEndpoints bool
|
||||
Logger slog.Logger
|
||||
ListenPort uint16
|
||||
// ForceNetworkUp forces the network to be considered up. magicsock will not
|
||||
// do anything if it thinks it can't reach the internet.
|
||||
ForceNetworkUp bool
|
||||
}
|
||||
|
||||
// NodeID creates a Tailscale NodeID from the last 8 bytes of a UUID. It ensures
|
||||
|
@ -171,6 +174,9 @@ func NewConn(options *Options) (conn *Conn, err error) {
|
|||
if options.DERPHeader != nil {
|
||||
magicConn.SetDERPHeader(options.DERPHeader.Clone())
|
||||
}
|
||||
if options.ForceNetworkUp {
|
||||
magicConn.SetNetworkUp(true)
|
||||
}
|
||||
|
||||
if v, ok := os.LookupEnv(EnvMagicsockDebugLogging); ok {
|
||||
vBool, err := strconv.ParseBool(v)
|
||||
|
|
|
@ -1,67 +1,162 @@
|
|||
//go:build linux
|
||||
// +build linux
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/netip"
|
||||
"strings"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/xerrors"
|
||||
"nhooyr.io/websocket"
|
||||
"tailscale.com/derp"
|
||||
"tailscale.com/derp/derphttp"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/types/key"
|
||||
|
||||
"cdr.dev/slog"
|
||||
"github.com/coder/coder/v2/coderd/httpapi"
|
||||
"github.com/coder/coder/v2/coderd/httpmw"
|
||||
"github.com/coder/coder/v2/coderd/tracing"
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
"github.com/coder/coder/v2/cryptorand"
|
||||
"github.com/coder/coder/v2/tailnet"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
)
|
||||
|
||||
func NetworkSetupDefault(*testing.T) {}
|
||||
// IDs used in tests.
|
||||
var (
|
||||
Client1ID = uuid.MustParse("00000000-0000-0000-0000-000000000001")
|
||||
Client2ID = uuid.MustParse("00000000-0000-0000-0000-000000000002")
|
||||
)
|
||||
|
||||
func DERPMapTailscale(ctx context.Context, t *testing.T) *tailcfg.DERPMap {
|
||||
ctx, cancel := context.WithTimeout(ctx, testutil.WaitShort)
|
||||
defer cancel()
|
||||
type TestTopology struct {
|
||||
Name string
|
||||
// SetupNetworking creates interfaces and network namespaces for the test.
|
||||
// The most simple implementation is NetworkSetupDefault, which only creates
|
||||
// a network namespace shared for all tests.
|
||||
SetupNetworking func(t *testing.T, log slog.Logger) TestNetworking
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", "https://controlplane.tailscale.com/derpmap/default", nil)
|
||||
require.NoError(t, err)
|
||||
// StartServer gets called in the server subprocess. It's expected to start
|
||||
// the coordinator server in the background and return.
|
||||
StartServer func(t *testing.T, log slog.Logger, listenAddr string)
|
||||
// StartClient gets called in each client subprocess. It's expected to
|
||||
// create the tailnet.Conn and ensure connectivity to it's peer.
|
||||
StartClient func(t *testing.T, log slog.Logger, serverURL *url.URL, myID uuid.UUID, peerID uuid.UUID) *tailnet.Conn
|
||||
|
||||
res, err := http.DefaultClient.Do(req)
|
||||
require.NoError(t, err)
|
||||
defer res.Body.Close()
|
||||
|
||||
dm := &tailcfg.DERPMap{}
|
||||
dec := json.NewDecoder(res.Body)
|
||||
err = dec.Decode(dm)
|
||||
require.NoError(t, err)
|
||||
|
||||
return dm
|
||||
// RunTests is the main test function. It's called in each of the client
|
||||
// subprocesses. If tests can only run once, they should check the client ID
|
||||
// and return early if it's not the expected one.
|
||||
RunTests func(t *testing.T, log slog.Logger, serverURL *url.URL, myID uuid.UUID, peerID uuid.UUID, conn *tailnet.Conn)
|
||||
}
|
||||
|
||||
func CoordinatorInMemory(t *testing.T, logger slog.Logger, dm *tailcfg.DERPMap) (coord tailnet.Coordinator, url string) {
|
||||
coord = tailnet.NewCoordinator(logger)
|
||||
type TestNetworking struct {
|
||||
// ServerListenAddr is the IP address and port that the server listens on,
|
||||
// passed to StartServer.
|
||||
ServerListenAddr string
|
||||
// ServerAccessURLClient1 is the hostname and port that the first client
|
||||
// uses to access the server.
|
||||
ServerAccessURLClient1 string
|
||||
// ServerAccessURLClient2 is the hostname and port that the second client
|
||||
// uses to access the server.
|
||||
ServerAccessURLClient2 string
|
||||
|
||||
// Networking settings for each subprocess.
|
||||
ProcessServer TestNetworkingProcess
|
||||
ProcessClient1 TestNetworkingProcess
|
||||
ProcessClient2 TestNetworkingProcess
|
||||
}
|
||||
|
||||
type TestNetworkingProcess struct {
|
||||
// NetNS to enter. If zero, the current network namespace is used.
|
||||
NetNSFd int
|
||||
}
|
||||
|
||||
func SetupNetworkingLoopback(t *testing.T, log slog.Logger) TestNetworking {
|
||||
netNSName := "codertest_netns_"
|
||||
randStr, err := cryptorand.String(4)
|
||||
require.NoError(t, err, "generate random string for netns name")
|
||||
netNSName += randStr
|
||||
|
||||
// Create a single network namespace for all tests so we can have an
|
||||
// isolated loopback interface.
|
||||
netNSFile, err := createNetNS(netNSName)
|
||||
require.NoError(t, err, "create network namespace")
|
||||
t.Cleanup(func() {
|
||||
_ = netNSFile.Close()
|
||||
})
|
||||
|
||||
var (
|
||||
listenAddr = "127.0.0.1:8080"
|
||||
process = TestNetworkingProcess{
|
||||
NetNSFd: int(netNSFile.Fd()),
|
||||
}
|
||||
)
|
||||
return TestNetworking{
|
||||
ServerListenAddr: listenAddr,
|
||||
ServerAccessURLClient1: "http://" + listenAddr,
|
||||
ServerAccessURLClient2: "http://" + listenAddr,
|
||||
ProcessServer: process,
|
||||
ProcessClient1: process,
|
||||
ProcessClient2: process,
|
||||
}
|
||||
}
|
||||
|
||||
func StartServerBasic(t *testing.T, logger slog.Logger, listenAddr string) {
|
||||
coord := tailnet.NewCoordinator(logger)
|
||||
var coordPtr atomic.Pointer[tailnet.Coordinator]
|
||||
coordPtr.Store(&coord)
|
||||
t.Cleanup(func() { _ = coord.Close() })
|
||||
|
||||
csvc, err := tailnet.NewClientService(logger, &coordPtr, 10*time.Minute, func() *tailcfg.DERPMap {
|
||||
return dm
|
||||
return &tailcfg.DERPMap{
|
||||
// Clients will set their own based on their custom access URL.
|
||||
Regions: map[int]*tailcfg.DERPRegion{},
|
||||
}
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
idStr := strings.TrimPrefix(r.URL.Path, "/")
|
||||
derpServer := derp.NewServer(key.NewNode(), tailnet.Logger(logger.Named("derp")))
|
||||
derpHandler, derpCloseFunc := tailnet.WithWebsocketSupport(derpServer, derphttp.Handler(derpServer))
|
||||
t.Cleanup(derpCloseFunc)
|
||||
|
||||
r := chi.NewRouter()
|
||||
r.Use(
|
||||
func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
logger.Debug(r.Context(), "start "+r.Method, slog.F("path", r.URL.Path), slog.F("remote_ip", r.RemoteAddr))
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
},
|
||||
tracing.StatusWriterMiddleware,
|
||||
httpmw.Logger(logger),
|
||||
)
|
||||
r.Route("/derp", func(r chi.Router) {
|
||||
r.Get("/", func(w http.ResponseWriter, r *http.Request) {
|
||||
logger.Info(r.Context(), "start derp request", slog.F("path", r.URL.Path), slog.F("remote_ip", r.RemoteAddr))
|
||||
derpHandler.ServeHTTP(w, r)
|
||||
})
|
||||
r.Get("/latency-check", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
})
|
||||
})
|
||||
r.Get("/api/v2/workspaceagents/{id}/coordinate", func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
idStr := chi.URLParamFromCtx(ctx, "id")
|
||||
id, err := uuid.Parse(idStr)
|
||||
if err != nil {
|
||||
httpapi.Write(r.Context(), w, http.StatusBadRequest, codersdk.Response{
|
||||
logger.Warn(ctx, "bad agent ID passed in URL params", slog.F("id_str", idStr), slog.Error(err))
|
||||
httpapi.Write(ctx, w, http.StatusBadRequest, codersdk.Response{
|
||||
Message: "Bad agent id.",
|
||||
Detail: err.Error(),
|
||||
})
|
||||
|
@ -70,14 +165,15 @@ func CoordinatorInMemory(t *testing.T, logger slog.Logger, dm *tailcfg.DERPMap)
|
|||
|
||||
conn, err := websocket.Accept(w, r, nil)
|
||||
if err != nil {
|
||||
httpapi.Write(r.Context(), w, http.StatusBadRequest, codersdk.Response{
|
||||
logger.Warn(ctx, "failed to accept websocket", slog.Error(err))
|
||||
httpapi.Write(ctx, w, http.StatusBadRequest, codersdk.Response{
|
||||
Message: "Failed to accept websocket.",
|
||||
Detail: err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
ctx, wsNetConn := codersdk.WebsocketNetConn(r.Context(), conn, websocket.MessageBinary)
|
||||
ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary)
|
||||
defer wsNetConn.Close()
|
||||
|
||||
err = csvc.ServeConnV2(ctx, wsNetConn, tailnet.StreamID{
|
||||
|
@ -86,43 +182,105 @@ func CoordinatorInMemory(t *testing.T, logger slog.Logger, dm *tailcfg.DERPMap)
|
|||
Auth: tailnet.SingleTailnetCoordinateeAuth{},
|
||||
})
|
||||
if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) {
|
||||
logger.Warn(ctx, "failed to serve conn", slog.Error(err))
|
||||
_ = conn.Close(websocket.StatusInternalError, err.Error())
|
||||
return
|
||||
}
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
})
|
||||
|
||||
return coord, srv.URL
|
||||
// We have a custom listen address.
|
||||
srv := http.Server{
|
||||
Addr: listenAddr,
|
||||
Handler: r,
|
||||
ReadTimeout: 10 * time.Second,
|
||||
}
|
||||
serveDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(serveDone)
|
||||
err := srv.ListenAndServe()
|
||||
if err != nil && !xerrors.Is(err, http.ErrServerClosed) {
|
||||
t.Error("HTTP server error:", err)
|
||||
}
|
||||
}()
|
||||
t.Cleanup(func() {
|
||||
_ = srv.Close()
|
||||
<-serveDone
|
||||
})
|
||||
}
|
||||
|
||||
func TailnetSetupDRPC(ctx context.Context, t *testing.T, logger slog.Logger,
|
||||
id, agentID uuid.UUID,
|
||||
coordinateURL string,
|
||||
dm *tailcfg.DERPMap,
|
||||
) *tailnet.Conn {
|
||||
ip := tailnet.IPFromUUID(id)
|
||||
conn, err := tailnet.NewConn(&tailnet.Options{
|
||||
Addresses: []netip.Prefix{netip.PrefixFrom(ip, 128)},
|
||||
DERPMap: dm,
|
||||
Logger: logger,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { _ = conn.Close() })
|
||||
func basicDERPMap(t *testing.T, serverURL *url.URL) *tailcfg.DERPMap {
|
||||
portStr := serverURL.Port()
|
||||
port, err := strconv.Atoi(portStr)
|
||||
require.NoError(t, err, "parse server port")
|
||||
|
||||
//nolint:bodyclose
|
||||
ws, _, err := websocket.Dial(ctx, coordinateURL+"/"+id.String(), nil)
|
||||
hostname := serverURL.Hostname()
|
||||
ipv4 := ""
|
||||
ip, err := netip.ParseAddr(hostname)
|
||||
if err == nil {
|
||||
hostname = ""
|
||||
ipv4 = ip.String()
|
||||
}
|
||||
|
||||
return &tailcfg.DERPMap{
|
||||
Regions: map[int]*tailcfg.DERPRegion{
|
||||
1: {
|
||||
RegionID: 1,
|
||||
RegionCode: "test",
|
||||
RegionName: "test server",
|
||||
Nodes: []*tailcfg.DERPNode{
|
||||
{
|
||||
Name: "test0",
|
||||
RegionID: 1,
|
||||
HostName: hostname,
|
||||
IPv4: ipv4,
|
||||
IPv6: "none",
|
||||
DERPPort: port,
|
||||
ForceHTTP: true,
|
||||
InsecureForTests: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func StartClientBasic(t *testing.T, logger slog.Logger, serverURL *url.URL, myID uuid.UUID, peerID uuid.UUID) *tailnet.Conn {
|
||||
u, err := serverURL.Parse(fmt.Sprintf("/api/v2/workspaceagents/%s/coordinate", myID.String()))
|
||||
require.NoError(t, err)
|
||||
//nolint:bodyclose
|
||||
ws, _, err := websocket.Dial(context.Background(), u.String(), nil)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
_ = ws.Close(websocket.StatusNormalClosure, "closing websocket")
|
||||
})
|
||||
|
||||
client, err := tailnet.NewDRPCClient(
|
||||
websocket.NetConn(ctx, ws, websocket.MessageBinary),
|
||||
websocket.NetConn(context.Background(), ws, websocket.MessageBinary),
|
||||
logger,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
coord, err := client.Coordinate(ctx)
|
||||
coord, err := client.Coordinate(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
coordination := tailnet.NewRemoteCoordination(logger, coord, conn, agentID)
|
||||
t.Cleanup(func() { _ = coordination.Close() })
|
||||
conn, err := tailnet.NewConn(&tailnet.Options{
|
||||
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IPFromUUID(myID), 128)},
|
||||
DERPMap: basicDERPMap(t, serverURL),
|
||||
BlockEndpoints: true,
|
||||
Logger: logger,
|
||||
// These tests don't have internet connection, so we need to force
|
||||
// magicsock to do anything.
|
||||
ForceNetworkUp: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
_ = conn.Close()
|
||||
})
|
||||
|
||||
coordination := tailnet.NewRemoteCoordination(logger, coord, conn, peerID)
|
||||
t.Cleanup(func() {
|
||||
_ = coordination.Close()
|
||||
})
|
||||
|
||||
return conn
|
||||
}
|
||||
|
|
|
@ -1,194 +0,0 @@
|
|||
package integration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"syscall"
|
||||
"testing"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"tailscale.com/tailcfg"
|
||||
|
||||
"cdr.dev/slog"
|
||||
"cdr.dev/slog/sloggers/slogtest"
|
||||
"github.com/coder/coder/v2/tailnet"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
)
|
||||
|
||||
var (
|
||||
isChild = flag.Bool("child", false, "Run tests as a child")
|
||||
childTestID = flag.Int("child-test-id", 0, "Which test is being run")
|
||||
childCoordinateURL = flag.String("child-coordinate-url", "", "The coordinate url to connect back to")
|
||||
childAgentID = flag.String("child-agent-id", "", "The agent id of the child")
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
if run := os.Getenv("CODER_TAILNET_TESTS"); run == "" {
|
||||
_, _ = fmt.Println("skipping tests...")
|
||||
return
|
||||
}
|
||||
if os.Getuid() != 0 {
|
||||
_, _ = fmt.Println("networking integration tests must run as root")
|
||||
return
|
||||
}
|
||||
flag.Parse()
|
||||
os.Exit(m.Run())
|
||||
}
|
||||
|
||||
var tests = []Test{{
|
||||
Name: "Normal",
|
||||
DERPMap: DERPMapTailscale,
|
||||
Coordinator: CoordinatorInMemory,
|
||||
Parent: Parent{
|
||||
NetworkSetup: NetworkSetupDefault,
|
||||
TailnetSetup: TailnetSetupDRPC,
|
||||
Run: func(ctx context.Context, t *testing.T, opts ParentOpts) {
|
||||
reach := opts.Conn.AwaitReachable(ctx, tailnet.IPFromUUID(opts.AgentID))
|
||||
assert.True(t, reach)
|
||||
},
|
||||
},
|
||||
Child: Child{
|
||||
NetworkSetup: NetworkSetupDefault,
|
||||
TailnetSetup: TailnetSetupDRPC,
|
||||
Run: func(ctx context.Context, t *testing.T, opts ChildOpts) {
|
||||
// wait until the parent kills us
|
||||
<-make(chan struct{})
|
||||
},
|
||||
},
|
||||
}}
|
||||
|
||||
//nolint:paralleltest
|
||||
func TestIntegration(t *testing.T) {
|
||||
if *isChild {
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitSuperLong)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
agentID, err := uuid.Parse(*childAgentID)
|
||||
require.NoError(t, err)
|
||||
|
||||
test := tests[*childTestID]
|
||||
test.Child.NetworkSetup(t)
|
||||
dm := test.DERPMap(ctx, t)
|
||||
conn := test.Child.TailnetSetup(ctx, t, logger, agentID, uuid.Nil, *childCoordinateURL, dm)
|
||||
test.Child.Run(ctx, t, ChildOpts{
|
||||
Logger: logger,
|
||||
Conn: conn,
|
||||
AgentID: agentID,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
for id, test := range tests {
|
||||
t.Run(test.Name, func(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitSuperLong)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
parentID, childID := uuid.New(), uuid.New()
|
||||
dm := test.DERPMap(ctx, t)
|
||||
_, coordURL := test.Coordinator(t, logger, dm)
|
||||
|
||||
child, waitChild := execChild(ctx, id, coordURL, childID)
|
||||
test.Parent.NetworkSetup(t)
|
||||
conn := test.Parent.TailnetSetup(ctx, t, logger, parentID, childID, coordURL, dm)
|
||||
test.Parent.Run(ctx, t, ParentOpts{
|
||||
Logger: logger,
|
||||
Conn: conn,
|
||||
ClientID: parentID,
|
||||
AgentID: childID,
|
||||
})
|
||||
child.Process.Signal(syscall.SIGINT)
|
||||
<-waitChild
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type Test struct {
|
||||
// Name is the name of the test.
|
||||
Name string
|
||||
|
||||
// DERPMap returns the DERP map to use for both the parent and child. It is
|
||||
// called once at the beginning of the test.
|
||||
DERPMap func(ctx context.Context, t *testing.T) *tailcfg.DERPMap
|
||||
// Coordinator returns a running tailnet coordinator, and the url to reach
|
||||
// it on.
|
||||
Coordinator func(t *testing.T, logger slog.Logger, dm *tailcfg.DERPMap) (coord tailnet.Coordinator, url string)
|
||||
|
||||
Parent Parent
|
||||
Child Child
|
||||
}
|
||||
|
||||
// Parent is the struct containing all of the parent specific configurations.
|
||||
// Functions are invoked in order of struct definition.
|
||||
type Parent struct {
|
||||
// NetworkSetup is run before all test code. It can be used to setup
|
||||
// networking scenarios.
|
||||
NetworkSetup func(t *testing.T)
|
||||
|
||||
// TailnetSetup creates a tailnet network.
|
||||
TailnetSetup func(
|
||||
ctx context.Context, t *testing.T, logger slog.Logger,
|
||||
id, agentID uuid.UUID, coordURL string, dm *tailcfg.DERPMap,
|
||||
) *tailnet.Conn
|
||||
|
||||
Run func(ctx context.Context, t *testing.T, opts ParentOpts)
|
||||
}
|
||||
|
||||
// Child is the struct containing all of the child specific configurations.
|
||||
// Functions are invoked in order of struct definition.
|
||||
type Child struct {
|
||||
// NetworkSetup is run before all test code. It can be used to setup
|
||||
// networking scenarios.
|
||||
NetworkSetup func(t *testing.T)
|
||||
|
||||
// TailnetSetup creates a tailnet network.
|
||||
TailnetSetup func(
|
||||
ctx context.Context, t *testing.T, logger slog.Logger,
|
||||
id, agentID uuid.UUID, coordURL string, dm *tailcfg.DERPMap,
|
||||
) *tailnet.Conn
|
||||
|
||||
// Run runs the actual test. Parents and children run in separate processes,
|
||||
// so it's important to ensure no communication happens over memory between
|
||||
// run functions of parents and children.
|
||||
Run func(ctx context.Context, t *testing.T, opts ChildOpts)
|
||||
}
|
||||
|
||||
type ParentOpts struct {
|
||||
Logger slog.Logger
|
||||
Conn *tailnet.Conn
|
||||
ClientID uuid.UUID
|
||||
AgentID uuid.UUID
|
||||
}
|
||||
|
||||
type ChildOpts struct {
|
||||
Logger slog.Logger
|
||||
Conn *tailnet.Conn
|
||||
AgentID uuid.UUID
|
||||
}
|
||||
|
||||
func execChild(ctx context.Context, testID int, coordURL string, agentID uuid.UUID) (*exec.Cmd, <-chan error) {
|
||||
ch := make(chan error)
|
||||
binary := os.Args[0]
|
||||
args := os.Args[1:]
|
||||
args = append(args,
|
||||
"--child=true",
|
||||
"--child-test-id="+strconv.Itoa(testID),
|
||||
"--child-coordinate-url="+coordURL,
|
||||
"--child-agent-id="+agentID.String(),
|
||||
)
|
||||
|
||||
cmd := exec.CommandContext(ctx, binary, args...)
|
||||
cmd.Stdout = os.Stdout
|
||||
cmd.Stderr = os.Stderr
|
||||
go func() {
|
||||
ch <- cmd.Run()
|
||||
}()
|
||||
return cmd, ch
|
||||
}
|
|
@ -0,0 +1,295 @@
|
|||
//go:build linux
|
||||
// +build linux
|
||||
|
||||
package integration_test
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/exec"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"cdr.dev/slog"
|
||||
"cdr.dev/slog/sloggers/slogtest"
|
||||
"github.com/coder/coder/v2/tailnet"
|
||||
"github.com/coder/coder/v2/tailnet/test/integration"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
)
|
||||
|
||||
const runTestEnv = "CODER_TAILNET_TESTS"
|
||||
|
||||
var (
|
||||
isSubprocess = flag.Bool("subprocess", false, "Signifies that this is a test subprocess")
|
||||
testID = flag.String("test-name", "", "Which test is being run")
|
||||
role = flag.String("role", "", "The role of the test subprocess: server, client")
|
||||
|
||||
// Role: server
|
||||
serverListenAddr = flag.String("server-listen-addr", "", "The address to listen on for the server")
|
||||
|
||||
// Role: client
|
||||
clientName = flag.String("client-name", "", "The name of the client for logs")
|
||||
clientServerURL = flag.String("client-server-url", "", "The url to connect to the server")
|
||||
clientMyID = flag.String("client-id", "", "The id of the client")
|
||||
clientPeerID = flag.String("client-peer-id", "", "The id of the other client")
|
||||
clientRunTests = flag.Bool("client-run-tests", false, "Run the tests in the client subprocess")
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
if run := os.Getenv(runTestEnv); run == "" {
|
||||
_, _ = fmt.Printf("skipping tests as %q is not set...\n", runTestEnv)
|
||||
return
|
||||
}
|
||||
if runtime.GOOS != "linux" {
|
||||
_, _ = fmt.Printf("GOOS %q is not linux", runtime.GOOS)
|
||||
os.Exit(1)
|
||||
return
|
||||
}
|
||||
if os.Getuid() != 0 {
|
||||
_, _ = fmt.Println("UID is not 0")
|
||||
os.Exit(1)
|
||||
return
|
||||
}
|
||||
|
||||
flag.Parse()
|
||||
os.Exit(m.Run())
|
||||
}
|
||||
|
||||
var topologies = []integration.TestTopology{
|
||||
{
|
||||
Name: "BasicLoopback",
|
||||
SetupNetworking: integration.SetupNetworkingLoopback,
|
||||
StartServer: integration.StartServerBasic,
|
||||
StartClient: integration.StartClientBasic,
|
||||
RunTests: func(t *testing.T, log slog.Logger, serverURL *url.URL, myID, peerID uuid.UUID, conn *tailnet.Conn) {
|
||||
// Test basic connectivity
|
||||
peerIP := tailnet.IPFromUUID(peerID)
|
||||
_, _, _, err := conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP)
|
||||
require.NoError(t, err, "ping peer")
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
//nolint:paralleltest
|
||||
func TestIntegration(t *testing.T) {
|
||||
if *isSubprocess {
|
||||
handleTestSubprocess(t)
|
||||
return
|
||||
}
|
||||
|
||||
for _, topo := range topologies {
|
||||
//nolint:paralleltest
|
||||
t.Run(topo.Name, func(t *testing.T) {
|
||||
log := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
|
||||
networking := topo.SetupNetworking(t, log)
|
||||
|
||||
// Fork the three child processes.
|
||||
serverErrCh, closeServer := startServerSubprocess(t, topo.Name, networking)
|
||||
// client1 runs the tests.
|
||||
client1ErrCh, _ := startClientSubprocess(t, topo.Name, networking, 1)
|
||||
client2ErrCh, closeClient2 := startClientSubprocess(t, topo.Name, networking, 2)
|
||||
|
||||
// Wait for client1 to exit.
|
||||
require.NoError(t, <-client1ErrCh)
|
||||
|
||||
// Close client2 and the server.
|
||||
closeClient2()
|
||||
require.NoError(t, <-client2ErrCh)
|
||||
closeServer()
|
||||
require.NoError(t, <-serverErrCh)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func handleTestSubprocess(t *testing.T) {
|
||||
// Find the specific topology.
|
||||
var topo integration.TestTopology
|
||||
for _, t := range topologies {
|
||||
if t.Name == *testID {
|
||||
topo = t
|
||||
break
|
||||
}
|
||||
}
|
||||
require.NotEmptyf(t, topo.Name, "unknown test topology %q", *testID)
|
||||
|
||||
testName := topo.Name + "/"
|
||||
if *role == "server" {
|
||||
testName += "server"
|
||||
} else {
|
||||
testName += *clientName
|
||||
}
|
||||
|
||||
//nolint:parralleltest
|
||||
t.Run(testName, func(t *testing.T) {
|
||||
log := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
switch *role {
|
||||
case "server":
|
||||
log = log.Named("server")
|
||||
topo.StartServer(t, log, *serverListenAddr)
|
||||
// no exit
|
||||
|
||||
case "client":
|
||||
log = log.Named(*clientName)
|
||||
serverURL, err := url.Parse(*clientServerURL)
|
||||
require.NoErrorf(t, err, "parse server url %q", *clientServerURL)
|
||||
myID, err := uuid.Parse(*clientMyID)
|
||||
require.NoErrorf(t, err, "parse client id %q", *clientMyID)
|
||||
peerID, err := uuid.Parse(*clientPeerID)
|
||||
require.NoErrorf(t, err, "parse peer id %q", *clientPeerID)
|
||||
|
||||
waitForServerAvailable(t, serverURL)
|
||||
|
||||
conn := topo.StartClient(t, log, serverURL, myID, peerID)
|
||||
|
||||
if *clientRunTests {
|
||||
topo.RunTests(t, log, serverURL, myID, peerID, conn)
|
||||
// and exit
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for signals.
|
||||
signals := make(chan os.Signal, 1)
|
||||
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
|
||||
<-signals
|
||||
})
|
||||
}
|
||||
|
||||
func waitForServerAvailable(t *testing.T, serverURL *url.URL) {
|
||||
const delay = 100 * time.Millisecond
|
||||
const reqTimeout = 2 * time.Second
|
||||
const timeout = 30 * time.Second
|
||||
client := http.Client{
|
||||
Timeout: reqTimeout,
|
||||
}
|
||||
|
||||
u, err := url.Parse(serverURL.String() + "/derp/latency-check")
|
||||
require.NoError(t, err)
|
||||
for start := time.Now(); time.Since(start) < timeout; time.Sleep(delay) {
|
||||
//nolint:noctx
|
||||
resp, err := client.Get(u.String())
|
||||
if err != nil {
|
||||
t.Logf("waiting for server to be available: %v", err)
|
||||
continue
|
||||
}
|
||||
_ = resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Logf("waiting for server to be available: got status %d", resp.StatusCode)
|
||||
continue
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
t.Fatalf("server did not become available after %v", timeout)
|
||||
}
|
||||
|
||||
func startServerSubprocess(t *testing.T, topologyName string, networking integration.TestNetworking) (<-chan error, func()) {
|
||||
return startSubprocess(t, networking.ProcessServer.NetNSFd, []string{
|
||||
"--subprocess",
|
||||
"--test-name=" + topologyName,
|
||||
"--role=server",
|
||||
"--server-listen-addr=" + networking.ServerListenAddr,
|
||||
})
|
||||
}
|
||||
|
||||
func startClientSubprocess(t *testing.T, topologyName string, networking integration.TestNetworking, clientNumber int) (<-chan error, func()) {
|
||||
require.True(t, clientNumber == 1 || clientNumber == 2)
|
||||
|
||||
var (
|
||||
clientName = fmt.Sprintf("client%d", clientNumber)
|
||||
myID = integration.Client1ID
|
||||
peerID = integration.Client2ID
|
||||
accessURL = networking.ServerAccessURLClient1
|
||||
)
|
||||
if clientNumber == 2 {
|
||||
myID, peerID = peerID, myID
|
||||
accessURL = networking.ServerAccessURLClient2
|
||||
}
|
||||
|
||||
flags := []string{
|
||||
"--subprocess",
|
||||
"--test-name=" + topologyName,
|
||||
"--role=client",
|
||||
"--client-name=" + clientName,
|
||||
"--client-server-url=" + accessURL,
|
||||
"--client-id=" + myID.String(),
|
||||
"--client-peer-id=" + peerID.String(),
|
||||
}
|
||||
if clientNumber == 1 {
|
||||
flags = append(flags, "--client-run-tests")
|
||||
}
|
||||
|
||||
return startSubprocess(t, networking.ProcessClient1.NetNSFd, flags)
|
||||
}
|
||||
|
||||
func startSubprocess(t *testing.T, netNSFd int, flags []string) (<-chan error, func()) {
|
||||
name := os.Args[0]
|
||||
args := append(os.Args[1:], flags...)
|
||||
|
||||
if netNSFd > 0 {
|
||||
// We use nsenter to enter the namespace.
|
||||
// We can't use `setns` easily from Golang in the parent process because
|
||||
// you can't execute the syscall in the forked child thread before it
|
||||
// execs.
|
||||
// We can't use `setns` easily from Golang in the child process because
|
||||
// by the time you call it, the process has already created multiple
|
||||
// threads.
|
||||
args = append([]string{"--net=/proc/self/fd/3", name}, args...)
|
||||
name = "nsenter"
|
||||
}
|
||||
|
||||
cmd := exec.Command(name, args...)
|
||||
if netNSFd > 0 {
|
||||
cmd.ExtraFiles = []*os.File{os.NewFile(uintptr(netNSFd), "")}
|
||||
}
|
||||
cmd.Stdout = os.Stdout
|
||||
cmd.Stderr = os.Stderr
|
||||
cmd.SysProcAttr = &syscall.SysProcAttr{
|
||||
Pdeathsig: syscall.SIGTERM,
|
||||
}
|
||||
err := cmd.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
waitErr := make(chan error, 1)
|
||||
go func() {
|
||||
err := cmd.Wait()
|
||||
waitErr <- err
|
||||
close(waitErr)
|
||||
}()
|
||||
|
||||
closeFn := func() {
|
||||
_ = cmd.Process.Signal(syscall.SIGTERM)
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
_ = cmd.Process.Kill()
|
||||
case <-waitErr:
|
||||
return
|
||||
}
|
||||
<-waitErr
|
||||
}
|
||||
|
||||
t.Cleanup(func() {
|
||||
select {
|
||||
case err := <-waitErr:
|
||||
if err != nil {
|
||||
t.Logf("subprocess exited: " + err.Error())
|
||||
}
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
closeFn()
|
||||
})
|
||||
|
||||
return waitErr, closeFn
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
//go:build linux
|
||||
// +build linux
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
// createNetNS creates a new network namespace with the given name. The returned
|
||||
// file is a file descriptor to the network namespace.
|
||||
func createNetNS(name string) (*os.File, error) {
|
||||
// We use ip-netns here because it handles the process of creating a
|
||||
// disowned netns for us.
|
||||
// The only way to create a network namespace is by calling unshare(2) or
|
||||
// clone(2) with the CLONE_NEWNET flag, and as soon as the last process in a
|
||||
// network namespace exits, the namespace is destroyed.
|
||||
// However, if you create a bind mount of /proc/$PID/ns/net to a file, it
|
||||
// will keep the namespace alive until the mount is removed.
|
||||
// ip-netns does this for us. Without it, we would have to fork anyways.
|
||||
// Later, we will use nsenter to enter this network namespace.
|
||||
err := exec.Command("ip", "netns", "add", name).Run()
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("create network namespace via ip-netns: %w", err)
|
||||
}
|
||||
|
||||
// Open /run/netns/$name to get a file descriptor to the network namespace
|
||||
// so it stays active after we soft-delete it.
|
||||
path := fmt.Sprintf("/run/netns/%s", name)
|
||||
file, err := os.OpenFile(path, os.O_RDONLY, 0)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("open network namespace file %q: %w", path, err)
|
||||
}
|
||||
|
||||
// Exec "ip link set lo up" in the namespace to bring up loopback
|
||||
// networking.
|
||||
//nolint:gosec
|
||||
err = exec.Command("ip", "netns", "exec", name, "ip", "link", "set", "lo", "up").Run()
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("bring up loopback interface in network namespace: %w", err)
|
||||
}
|
||||
|
||||
// Remove the network namespace. The kernel will keep it around until the
|
||||
// file descriptor is closed.
|
||||
err = exec.Command("ip", "netns", "delete", name).Run()
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("soft delete network namespace via ip-netns: %w", err)
|
||||
}
|
||||
|
||||
return file, nil
|
||||
}
|
Loading…
Reference in New Issue