chore: do network integration tests in isolated net ns

This commit is contained in:
Dean Sheather 2024-05-01 16:54:02 +00:00
parent 3ff9cef498
commit 85bb13d142
6 changed files with 569 additions and 245 deletions

View File

@ -498,10 +498,14 @@ func (c *configMaps) setAllPeersLost() {
lc.setLostTimer(c)
// it's important to drop a log here so that we see it get marked lost if grepping thru
// the logs for a specific peer
keyID := "(nil node)"
if lc.node != nil {
keyID = lc.node.Key.ShortString()
}
c.logger.Debug(context.Background(),
"setAllPeersLost marked peer lost",
slog.F("peer_id", lc.peerID),
slog.F("key_id", lc.node.Key.ShortString()),
slog.F("key_id", keyID),
)
}
}

View File

@ -93,6 +93,9 @@ type Options struct {
BlockEndpoints bool
Logger slog.Logger
ListenPort uint16
// ForceNetworkUp forces the network to be considered up. magicsock will not
// do anything if it thinks it can't reach the internet.
ForceNetworkUp bool
}
// NodeID creates a Tailscale NodeID from the last 8 bytes of a UUID. It ensures
@ -171,6 +174,9 @@ func NewConn(options *Options) (conn *Conn, err error) {
if options.DERPHeader != nil {
magicConn.SetDERPHeader(options.DERPHeader.Clone())
}
if options.ForceNetworkUp {
magicConn.SetNetworkUp(true)
}
if v, ok := os.LookupEnv(EnvMagicsockDebugLogging); ok {
vBool, err := strconv.ParseBool(v)

View File

@ -1,67 +1,162 @@
//go:build linux
// +build linux
package integration
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/netip"
"strings"
"net/url"
"strconv"
"sync/atomic"
"testing"
"time"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"nhooyr.io/websocket"
"tailscale.com/derp"
"tailscale.com/derp/derphttp"
"tailscale.com/tailcfg"
"tailscale.com/types/key"
"cdr.dev/slog"
"github.com/coder/coder/v2/coderd/httpapi"
"github.com/coder/coder/v2/coderd/httpmw"
"github.com/coder/coder/v2/coderd/tracing"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/cryptorand"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/testutil"
)
func NetworkSetupDefault(*testing.T) {}
// IDs used in tests.
var (
Client1ID = uuid.MustParse("00000000-0000-0000-0000-000000000001")
Client2ID = uuid.MustParse("00000000-0000-0000-0000-000000000002")
)
func DERPMapTailscale(ctx context.Context, t *testing.T) *tailcfg.DERPMap {
ctx, cancel := context.WithTimeout(ctx, testutil.WaitShort)
defer cancel()
type TestTopology struct {
Name string
// SetupNetworking creates interfaces and network namespaces for the test.
// The most simple implementation is NetworkSetupDefault, which only creates
// a network namespace shared for all tests.
SetupNetworking func(t *testing.T, log slog.Logger) TestNetworking
req, err := http.NewRequestWithContext(ctx, "GET", "https://controlplane.tailscale.com/derpmap/default", nil)
require.NoError(t, err)
// StartServer gets called in the server subprocess. It's expected to start
// the coordinator server in the background and return.
StartServer func(t *testing.T, log slog.Logger, listenAddr string)
// StartClient gets called in each client subprocess. It's expected to
// create the tailnet.Conn and ensure connectivity to it's peer.
StartClient func(t *testing.T, log slog.Logger, serverURL *url.URL, myID uuid.UUID, peerID uuid.UUID) *tailnet.Conn
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer res.Body.Close()
dm := &tailcfg.DERPMap{}
dec := json.NewDecoder(res.Body)
err = dec.Decode(dm)
require.NoError(t, err)
return dm
// RunTests is the main test function. It's called in each of the client
// subprocesses. If tests can only run once, they should check the client ID
// and return early if it's not the expected one.
RunTests func(t *testing.T, log slog.Logger, serverURL *url.URL, myID uuid.UUID, peerID uuid.UUID, conn *tailnet.Conn)
}
func CoordinatorInMemory(t *testing.T, logger slog.Logger, dm *tailcfg.DERPMap) (coord tailnet.Coordinator, url string) {
coord = tailnet.NewCoordinator(logger)
type TestNetworking struct {
// ServerListenAddr is the IP address and port that the server listens on,
// passed to StartServer.
ServerListenAddr string
// ServerAccessURLClient1 is the hostname and port that the first client
// uses to access the server.
ServerAccessURLClient1 string
// ServerAccessURLClient2 is the hostname and port that the second client
// uses to access the server.
ServerAccessURLClient2 string
// Networking settings for each subprocess.
ProcessServer TestNetworkingProcess
ProcessClient1 TestNetworkingProcess
ProcessClient2 TestNetworkingProcess
}
type TestNetworkingProcess struct {
// NetNS to enter. If zero, the current network namespace is used.
NetNSFd int
}
func SetupNetworkingLoopback(t *testing.T, log slog.Logger) TestNetworking {
netNSName := "codertest_netns_"
randStr, err := cryptorand.String(4)
require.NoError(t, err, "generate random string for netns name")
netNSName += randStr
// Create a single network namespace for all tests so we can have an
// isolated loopback interface.
netNSFile, err := createNetNS(netNSName)
require.NoError(t, err, "create network namespace")
t.Cleanup(func() {
_ = netNSFile.Close()
})
var (
listenAddr = "127.0.0.1:8080"
process = TestNetworkingProcess{
NetNSFd: int(netNSFile.Fd()),
}
)
return TestNetworking{
ServerListenAddr: listenAddr,
ServerAccessURLClient1: "http://" + listenAddr,
ServerAccessURLClient2: "http://" + listenAddr,
ProcessServer: process,
ProcessClient1: process,
ProcessClient2: process,
}
}
func StartServerBasic(t *testing.T, logger slog.Logger, listenAddr string) {
coord := tailnet.NewCoordinator(logger)
var coordPtr atomic.Pointer[tailnet.Coordinator]
coordPtr.Store(&coord)
t.Cleanup(func() { _ = coord.Close() })
csvc, err := tailnet.NewClientService(logger, &coordPtr, 10*time.Minute, func() *tailcfg.DERPMap {
return dm
return &tailcfg.DERPMap{
// Clients will set their own based on their custom access URL.
Regions: map[int]*tailcfg.DERPRegion{},
}
})
require.NoError(t, err)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
idStr := strings.TrimPrefix(r.URL.Path, "/")
derpServer := derp.NewServer(key.NewNode(), tailnet.Logger(logger.Named("derp")))
derpHandler, derpCloseFunc := tailnet.WithWebsocketSupport(derpServer, derphttp.Handler(derpServer))
t.Cleanup(derpCloseFunc)
r := chi.NewRouter()
r.Use(
func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
logger.Debug(r.Context(), "start "+r.Method, slog.F("path", r.URL.Path), slog.F("remote_ip", r.RemoteAddr))
next.ServeHTTP(w, r)
})
},
tracing.StatusWriterMiddleware,
httpmw.Logger(logger),
)
r.Route("/derp", func(r chi.Router) {
r.Get("/", func(w http.ResponseWriter, r *http.Request) {
logger.Info(r.Context(), "start derp request", slog.F("path", r.URL.Path), slog.F("remote_ip", r.RemoteAddr))
derpHandler.ServeHTTP(w, r)
})
r.Get("/latency-check", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
})
r.Get("/api/v2/workspaceagents/{id}/coordinate", func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
idStr := chi.URLParamFromCtx(ctx, "id")
id, err := uuid.Parse(idStr)
if err != nil {
httpapi.Write(r.Context(), w, http.StatusBadRequest, codersdk.Response{
logger.Warn(ctx, "bad agent ID passed in URL params", slog.F("id_str", idStr), slog.Error(err))
httpapi.Write(ctx, w, http.StatusBadRequest, codersdk.Response{
Message: "Bad agent id.",
Detail: err.Error(),
})
@ -70,14 +165,15 @@ func CoordinatorInMemory(t *testing.T, logger slog.Logger, dm *tailcfg.DERPMap)
conn, err := websocket.Accept(w, r, nil)
if err != nil {
httpapi.Write(r.Context(), w, http.StatusBadRequest, codersdk.Response{
logger.Warn(ctx, "failed to accept websocket", slog.Error(err))
httpapi.Write(ctx, w, http.StatusBadRequest, codersdk.Response{
Message: "Failed to accept websocket.",
Detail: err.Error(),
})
return
}
ctx, wsNetConn := codersdk.WebsocketNetConn(r.Context(), conn, websocket.MessageBinary)
ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary)
defer wsNetConn.Close()
err = csvc.ServeConnV2(ctx, wsNetConn, tailnet.StreamID{
@ -86,43 +182,105 @@ func CoordinatorInMemory(t *testing.T, logger slog.Logger, dm *tailcfg.DERPMap)
Auth: tailnet.SingleTailnetCoordinateeAuth{},
})
if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) {
logger.Warn(ctx, "failed to serve conn", slog.Error(err))
_ = conn.Close(websocket.StatusInternalError, err.Error())
return
}
}))
t.Cleanup(srv.Close)
})
return coord, srv.URL
// We have a custom listen address.
srv := http.Server{
Addr: listenAddr,
Handler: r,
ReadTimeout: 10 * time.Second,
}
serveDone := make(chan struct{})
go func() {
defer close(serveDone)
err := srv.ListenAndServe()
if err != nil && !xerrors.Is(err, http.ErrServerClosed) {
t.Error("HTTP server error:", err)
}
}()
t.Cleanup(func() {
_ = srv.Close()
<-serveDone
})
}
func TailnetSetupDRPC(ctx context.Context, t *testing.T, logger slog.Logger,
id, agentID uuid.UUID,
coordinateURL string,
dm *tailcfg.DERPMap,
) *tailnet.Conn {
ip := tailnet.IPFromUUID(id)
conn, err := tailnet.NewConn(&tailnet.Options{
Addresses: []netip.Prefix{netip.PrefixFrom(ip, 128)},
DERPMap: dm,
Logger: logger,
})
require.NoError(t, err)
t.Cleanup(func() { _ = conn.Close() })
func basicDERPMap(t *testing.T, serverURL *url.URL) *tailcfg.DERPMap {
portStr := serverURL.Port()
port, err := strconv.Atoi(portStr)
require.NoError(t, err, "parse server port")
//nolint:bodyclose
ws, _, err := websocket.Dial(ctx, coordinateURL+"/"+id.String(), nil)
hostname := serverURL.Hostname()
ipv4 := ""
ip, err := netip.ParseAddr(hostname)
if err == nil {
hostname = ""
ipv4 = ip.String()
}
return &tailcfg.DERPMap{
Regions: map[int]*tailcfg.DERPRegion{
1: {
RegionID: 1,
RegionCode: "test",
RegionName: "test server",
Nodes: []*tailcfg.DERPNode{
{
Name: "test0",
RegionID: 1,
HostName: hostname,
IPv4: ipv4,
IPv6: "none",
DERPPort: port,
ForceHTTP: true,
InsecureForTests: true,
},
},
},
},
}
}
func StartClientBasic(t *testing.T, logger slog.Logger, serverURL *url.URL, myID uuid.UUID, peerID uuid.UUID) *tailnet.Conn {
u, err := serverURL.Parse(fmt.Sprintf("/api/v2/workspaceagents/%s/coordinate", myID.String()))
require.NoError(t, err)
//nolint:bodyclose
ws, _, err := websocket.Dial(context.Background(), u.String(), nil)
require.NoError(t, err)
t.Cleanup(func() {
_ = ws.Close(websocket.StatusNormalClosure, "closing websocket")
})
client, err := tailnet.NewDRPCClient(
websocket.NetConn(ctx, ws, websocket.MessageBinary),
websocket.NetConn(context.Background(), ws, websocket.MessageBinary),
logger,
)
require.NoError(t, err)
coord, err := client.Coordinate(ctx)
coord, err := client.Coordinate(context.Background())
require.NoError(t, err)
coordination := tailnet.NewRemoteCoordination(logger, coord, conn, agentID)
t.Cleanup(func() { _ = coordination.Close() })
conn, err := tailnet.NewConn(&tailnet.Options{
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IPFromUUID(myID), 128)},
DERPMap: basicDERPMap(t, serverURL),
BlockEndpoints: true,
Logger: logger,
// These tests don't have internet connection, so we need to force
// magicsock to do anything.
ForceNetworkUp: true,
})
require.NoError(t, err)
t.Cleanup(func() {
_ = conn.Close()
})
coordination := tailnet.NewRemoteCoordination(logger, coord, conn, peerID)
t.Cleanup(func() {
_ = coordination.Close()
})
return conn
}

View File

@ -1,194 +0,0 @@
package integration
import (
"context"
"flag"
"fmt"
"os"
"os/exec"
"strconv"
"syscall"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"tailscale.com/tailcfg"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/testutil"
)
var (
isChild = flag.Bool("child", false, "Run tests as a child")
childTestID = flag.Int("child-test-id", 0, "Which test is being run")
childCoordinateURL = flag.String("child-coordinate-url", "", "The coordinate url to connect back to")
childAgentID = flag.String("child-agent-id", "", "The agent id of the child")
)
func TestMain(m *testing.M) {
if run := os.Getenv("CODER_TAILNET_TESTS"); run == "" {
_, _ = fmt.Println("skipping tests...")
return
}
if os.Getuid() != 0 {
_, _ = fmt.Println("networking integration tests must run as root")
return
}
flag.Parse()
os.Exit(m.Run())
}
var tests = []Test{{
Name: "Normal",
DERPMap: DERPMapTailscale,
Coordinator: CoordinatorInMemory,
Parent: Parent{
NetworkSetup: NetworkSetupDefault,
TailnetSetup: TailnetSetupDRPC,
Run: func(ctx context.Context, t *testing.T, opts ParentOpts) {
reach := opts.Conn.AwaitReachable(ctx, tailnet.IPFromUUID(opts.AgentID))
assert.True(t, reach)
},
},
Child: Child{
NetworkSetup: NetworkSetupDefault,
TailnetSetup: TailnetSetupDRPC,
Run: func(ctx context.Context, t *testing.T, opts ChildOpts) {
// wait until the parent kills us
<-make(chan struct{})
},
},
}}
//nolint:paralleltest
func TestIntegration(t *testing.T) {
if *isChild {
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitSuperLong)
t.Cleanup(cancel)
agentID, err := uuid.Parse(*childAgentID)
require.NoError(t, err)
test := tests[*childTestID]
test.Child.NetworkSetup(t)
dm := test.DERPMap(ctx, t)
conn := test.Child.TailnetSetup(ctx, t, logger, agentID, uuid.Nil, *childCoordinateURL, dm)
test.Child.Run(ctx, t, ChildOpts{
Logger: logger,
Conn: conn,
AgentID: agentID,
})
return
}
for id, test := range tests {
t.Run(test.Name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitSuperLong)
t.Cleanup(cancel)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
parentID, childID := uuid.New(), uuid.New()
dm := test.DERPMap(ctx, t)
_, coordURL := test.Coordinator(t, logger, dm)
child, waitChild := execChild(ctx, id, coordURL, childID)
test.Parent.NetworkSetup(t)
conn := test.Parent.TailnetSetup(ctx, t, logger, parentID, childID, coordURL, dm)
test.Parent.Run(ctx, t, ParentOpts{
Logger: logger,
Conn: conn,
ClientID: parentID,
AgentID: childID,
})
child.Process.Signal(syscall.SIGINT)
<-waitChild
})
}
}
type Test struct {
// Name is the name of the test.
Name string
// DERPMap returns the DERP map to use for both the parent and child. It is
// called once at the beginning of the test.
DERPMap func(ctx context.Context, t *testing.T) *tailcfg.DERPMap
// Coordinator returns a running tailnet coordinator, and the url to reach
// it on.
Coordinator func(t *testing.T, logger slog.Logger, dm *tailcfg.DERPMap) (coord tailnet.Coordinator, url string)
Parent Parent
Child Child
}
// Parent is the struct containing all of the parent specific configurations.
// Functions are invoked in order of struct definition.
type Parent struct {
// NetworkSetup is run before all test code. It can be used to setup
// networking scenarios.
NetworkSetup func(t *testing.T)
// TailnetSetup creates a tailnet network.
TailnetSetup func(
ctx context.Context, t *testing.T, logger slog.Logger,
id, agentID uuid.UUID, coordURL string, dm *tailcfg.DERPMap,
) *tailnet.Conn
Run func(ctx context.Context, t *testing.T, opts ParentOpts)
}
// Child is the struct containing all of the child specific configurations.
// Functions are invoked in order of struct definition.
type Child struct {
// NetworkSetup is run before all test code. It can be used to setup
// networking scenarios.
NetworkSetup func(t *testing.T)
// TailnetSetup creates a tailnet network.
TailnetSetup func(
ctx context.Context, t *testing.T, logger slog.Logger,
id, agentID uuid.UUID, coordURL string, dm *tailcfg.DERPMap,
) *tailnet.Conn
// Run runs the actual test. Parents and children run in separate processes,
// so it's important to ensure no communication happens over memory between
// run functions of parents and children.
Run func(ctx context.Context, t *testing.T, opts ChildOpts)
}
type ParentOpts struct {
Logger slog.Logger
Conn *tailnet.Conn
ClientID uuid.UUID
AgentID uuid.UUID
}
type ChildOpts struct {
Logger slog.Logger
Conn *tailnet.Conn
AgentID uuid.UUID
}
func execChild(ctx context.Context, testID int, coordURL string, agentID uuid.UUID) (*exec.Cmd, <-chan error) {
ch := make(chan error)
binary := os.Args[0]
args := os.Args[1:]
args = append(args,
"--child=true",
"--child-test-id="+strconv.Itoa(testID),
"--child-coordinate-url="+coordURL,
"--child-agent-id="+agentID.String(),
)
cmd := exec.CommandContext(ctx, binary, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
go func() {
ch <- cmd.Run()
}()
return cmd, ch
}

View File

@ -0,0 +1,295 @@
//go:build linux
// +build linux
package integration_test
import (
"flag"
"fmt"
"net/http"
"net/url"
"os"
"os/exec"
"os/signal"
"runtime"
"syscall"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/tailnet/test/integration"
"github.com/coder/coder/v2/testutil"
)
const runTestEnv = "CODER_TAILNET_TESTS"
var (
isSubprocess = flag.Bool("subprocess", false, "Signifies that this is a test subprocess")
testID = flag.String("test-name", "", "Which test is being run")
role = flag.String("role", "", "The role of the test subprocess: server, client")
// Role: server
serverListenAddr = flag.String("server-listen-addr", "", "The address to listen on for the server")
// Role: client
clientName = flag.String("client-name", "", "The name of the client for logs")
clientServerURL = flag.String("client-server-url", "", "The url to connect to the server")
clientMyID = flag.String("client-id", "", "The id of the client")
clientPeerID = flag.String("client-peer-id", "", "The id of the other client")
clientRunTests = flag.Bool("client-run-tests", false, "Run the tests in the client subprocess")
)
func TestMain(m *testing.M) {
if run := os.Getenv(runTestEnv); run == "" {
_, _ = fmt.Printf("skipping tests as %q is not set...\n", runTestEnv)
return
}
if runtime.GOOS != "linux" {
_, _ = fmt.Printf("GOOS %q is not linux", runtime.GOOS)
os.Exit(1)
return
}
if os.Getuid() != 0 {
_, _ = fmt.Println("UID is not 0")
os.Exit(1)
return
}
flag.Parse()
os.Exit(m.Run())
}
var topologies = []integration.TestTopology{
{
Name: "BasicLoopback",
SetupNetworking: integration.SetupNetworkingLoopback,
StartServer: integration.StartServerBasic,
StartClient: integration.StartClientBasic,
RunTests: func(t *testing.T, log slog.Logger, serverURL *url.URL, myID, peerID uuid.UUID, conn *tailnet.Conn) {
// Test basic connectivity
peerIP := tailnet.IPFromUUID(peerID)
_, _, _, err := conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP)
require.NoError(t, err, "ping peer")
},
},
}
//nolint:paralleltest
func TestIntegration(t *testing.T) {
if *isSubprocess {
handleTestSubprocess(t)
return
}
for _, topo := range topologies {
//nolint:paralleltest
t.Run(topo.Name, func(t *testing.T) {
log := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
networking := topo.SetupNetworking(t, log)
// Fork the three child processes.
serverErrCh, closeServer := startServerSubprocess(t, topo.Name, networking)
// client1 runs the tests.
client1ErrCh, _ := startClientSubprocess(t, topo.Name, networking, 1)
client2ErrCh, closeClient2 := startClientSubprocess(t, topo.Name, networking, 2)
// Wait for client1 to exit.
require.NoError(t, <-client1ErrCh)
// Close client2 and the server.
closeClient2()
require.NoError(t, <-client2ErrCh)
closeServer()
require.NoError(t, <-serverErrCh)
})
}
}
func handleTestSubprocess(t *testing.T) {
// Find the specific topology.
var topo integration.TestTopology
for _, t := range topologies {
if t.Name == *testID {
topo = t
break
}
}
require.NotEmptyf(t, topo.Name, "unknown test topology %q", *testID)
testName := topo.Name + "/"
if *role == "server" {
testName += "server"
} else {
testName += *clientName
}
//nolint:parralleltest
t.Run(testName, func(t *testing.T) {
log := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
switch *role {
case "server":
log = log.Named("server")
topo.StartServer(t, log, *serverListenAddr)
// no exit
case "client":
log = log.Named(*clientName)
serverURL, err := url.Parse(*clientServerURL)
require.NoErrorf(t, err, "parse server url %q", *clientServerURL)
myID, err := uuid.Parse(*clientMyID)
require.NoErrorf(t, err, "parse client id %q", *clientMyID)
peerID, err := uuid.Parse(*clientPeerID)
require.NoErrorf(t, err, "parse peer id %q", *clientPeerID)
waitForServerAvailable(t, serverURL)
conn := topo.StartClient(t, log, serverURL, myID, peerID)
if *clientRunTests {
topo.RunTests(t, log, serverURL, myID, peerID, conn)
// and exit
return
}
}
// Wait for signals.
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
<-signals
})
}
func waitForServerAvailable(t *testing.T, serverURL *url.URL) {
const delay = 100 * time.Millisecond
const reqTimeout = 2 * time.Second
const timeout = 30 * time.Second
client := http.Client{
Timeout: reqTimeout,
}
u, err := url.Parse(serverURL.String() + "/derp/latency-check")
require.NoError(t, err)
for start := time.Now(); time.Since(start) < timeout; time.Sleep(delay) {
//nolint:noctx
resp, err := client.Get(u.String())
if err != nil {
t.Logf("waiting for server to be available: %v", err)
continue
}
_ = resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Logf("waiting for server to be available: got status %d", resp.StatusCode)
continue
}
return
}
t.Fatalf("server did not become available after %v", timeout)
}
func startServerSubprocess(t *testing.T, topologyName string, networking integration.TestNetworking) (<-chan error, func()) {
return startSubprocess(t, networking.ProcessServer.NetNSFd, []string{
"--subprocess",
"--test-name=" + topologyName,
"--role=server",
"--server-listen-addr=" + networking.ServerListenAddr,
})
}
func startClientSubprocess(t *testing.T, topologyName string, networking integration.TestNetworking, clientNumber int) (<-chan error, func()) {
require.True(t, clientNumber == 1 || clientNumber == 2)
var (
clientName = fmt.Sprintf("client%d", clientNumber)
myID = integration.Client1ID
peerID = integration.Client2ID
accessURL = networking.ServerAccessURLClient1
)
if clientNumber == 2 {
myID, peerID = peerID, myID
accessURL = networking.ServerAccessURLClient2
}
flags := []string{
"--subprocess",
"--test-name=" + topologyName,
"--role=client",
"--client-name=" + clientName,
"--client-server-url=" + accessURL,
"--client-id=" + myID.String(),
"--client-peer-id=" + peerID.String(),
}
if clientNumber == 1 {
flags = append(flags, "--client-run-tests")
}
return startSubprocess(t, networking.ProcessClient1.NetNSFd, flags)
}
func startSubprocess(t *testing.T, netNSFd int, flags []string) (<-chan error, func()) {
name := os.Args[0]
args := append(os.Args[1:], flags...)
if netNSFd > 0 {
// We use nsenter to enter the namespace.
// We can't use `setns` easily from Golang in the parent process because
// you can't execute the syscall in the forked child thread before it
// execs.
// We can't use `setns` easily from Golang in the child process because
// by the time you call it, the process has already created multiple
// threads.
args = append([]string{"--net=/proc/self/fd/3", name}, args...)
name = "nsenter"
}
cmd := exec.Command(name, args...)
if netNSFd > 0 {
cmd.ExtraFiles = []*os.File{os.NewFile(uintptr(netNSFd), "")}
}
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.SysProcAttr = &syscall.SysProcAttr{
Pdeathsig: syscall.SIGTERM,
}
err := cmd.Start()
require.NoError(t, err)
waitErr := make(chan error, 1)
go func() {
err := cmd.Wait()
waitErr <- err
close(waitErr)
}()
closeFn := func() {
_ = cmd.Process.Signal(syscall.SIGTERM)
select {
case <-time.After(5 * time.Second):
_ = cmd.Process.Kill()
case <-waitErr:
return
}
<-waitErr
}
t.Cleanup(func() {
select {
case err := <-waitErr:
if err != nil {
t.Logf("subprocess exited: " + err.Error())
}
return
default:
}
closeFn()
})
return waitErr, closeFn
}

View File

@ -0,0 +1,55 @@
//go:build linux
// +build linux
package integration
import (
"fmt"
"os"
"os/exec"
"golang.org/x/xerrors"
)
// createNetNS creates a new network namespace with the given name. The returned
// file is a file descriptor to the network namespace.
func createNetNS(name string) (*os.File, error) {
// We use ip-netns here because it handles the process of creating a
// disowned netns for us.
// The only way to create a network namespace is by calling unshare(2) or
// clone(2) with the CLONE_NEWNET flag, and as soon as the last process in a
// network namespace exits, the namespace is destroyed.
// However, if you create a bind mount of /proc/$PID/ns/net to a file, it
// will keep the namespace alive until the mount is removed.
// ip-netns does this for us. Without it, we would have to fork anyways.
// Later, we will use nsenter to enter this network namespace.
err := exec.Command("ip", "netns", "add", name).Run()
if err != nil {
return nil, xerrors.Errorf("create network namespace via ip-netns: %w", err)
}
// Open /run/netns/$name to get a file descriptor to the network namespace
// so it stays active after we soft-delete it.
path := fmt.Sprintf("/run/netns/%s", name)
file, err := os.OpenFile(path, os.O_RDONLY, 0)
if err != nil {
return nil, xerrors.Errorf("open network namespace file %q: %w", path, err)
}
// Exec "ip link set lo up" in the namespace to bring up loopback
// networking.
//nolint:gosec
err = exec.Command("ip", "netns", "exec", name, "ip", "link", "set", "lo", "up").Run()
if err != nil {
return nil, xerrors.Errorf("bring up loopback interface in network namespace: %w", err)
}
// Remove the network namespace. The kernel will keep it around until the
// file descriptor is closed.
err = exec.Command("ip", "netns", "delete", name).Run()
if err != nil {
return nil, xerrors.Errorf("soft delete network namespace via ip-netns: %w", err)
}
return file, nil
}