chore: add nginx topology to tailnet tests (#13188)

This commit is contained in:
Colin Adler 2024-05-07 03:17:38 -05:00 committed by GitHub
parent 677be9aab2
commit 421c0d1242
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 253 additions and 167 deletions

View File

@ -410,6 +410,10 @@ jobs:
- name: Setup Go
uses: ./.github/actions/setup-go
# Used by some integration tests.
- name: Install Nginx
run: sudo apt-get update && sudo apt-get install -y nginx
- name: Run Tests
run: make test-tailnet-integration

View File

@ -34,7 +34,7 @@ func ProvisionerTypeValid[T ProvisionerType | string](pt T) error {
case string(ProvisionerTypeEcho), string(ProvisionerTypeTerraform):
return nil
default:
return fmt.Errorf("provisioner type '%s' is not supported", pt)
return xerrors.Errorf("provisioner type '%s' is not supported", pt)
}
}

View File

@ -611,7 +611,7 @@ func TestProvisionerd(t *testing.T) {
server := createProvisionerd(t, func(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error) {
// This is the dial out to Coderd, which in this unit test will always fail.
connectAttemptedClose.Do(func() { close(connectAttempted) })
return nil, fmt.Errorf("client connection always fails")
return nil, xerrors.New("client connection always fails")
}, provisionerd.LocalProvisioners{
"someprovisioner": createProvisionerClient(t, done, provisionerTestServer{}),
})

View File

@ -7,12 +7,18 @@ import (
"context"
"fmt"
"io"
"net"
"net/http"
"net/netip"
"net/url"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
@ -41,7 +47,34 @@ var (
Client2ID = uuid.MustParse("00000000-0000-0000-0000-000000000002")
)
type ServerOptions struct {
type TestTopology struct {
Name string
// SetupNetworking creates interfaces and network namespaces for the test.
// The most simple implementation is NetworkSetupDefault, which only creates
// a network namespace shared for all tests.
SetupNetworking func(t *testing.T, logger slog.Logger) TestNetworking
// Server is the server starter for the test. It is executed in the server
// subprocess.
Server ServerStarter
// StartClient gets called in each client subprocess. It's expected to
// create the tailnet.Conn and ensure connectivity to it's peer.
StartClient func(t *testing.T, logger slog.Logger, serverURL *url.URL, myID uuid.UUID, peerID uuid.UUID) *tailnet.Conn
// RunTests is the main test function. It's called in each of the client
// subprocesses. If tests can only run once, they should check the client ID
// and return early if it's not the expected one.
RunTests func(t *testing.T, logger slog.Logger, serverURL *url.URL, myID uuid.UUID, peerID uuid.UUID, conn *tailnet.Conn)
}
type ServerStarter interface {
// StartServer should start the server and return once it's listening. It
// should not block once it's listening. Cleanup should be handled by
// t.Cleanup.
StartServer(t *testing.T, logger slog.Logger, listenAddr string)
}
type SimpleServerOptions struct {
// FailUpgradeDERP will make the DERP server fail to handle the initial DERP
// upgrade in a way that causes the client to fallback to
// DERP-over-WebSocket fallback automatically.
@ -54,8 +87,10 @@ type ServerOptions struct {
DERPWebsocketOnly bool
}
var _ ServerStarter = SimpleServerOptions{}
//nolint:revive
func (o ServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux {
func (o SimpleServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux {
coord := tailnet.NewCoordinator(logger)
var coordPtr atomic.Pointer[tailnet.Coordinator]
coordPtr.Store(&coord)
@ -157,6 +192,76 @@ func (o ServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux {
return r
}
func (o SimpleServerOptions) StartServer(t *testing.T, logger slog.Logger, listenAddr string) {
srv := http.Server{
Addr: listenAddr,
Handler: o.Router(t, logger),
ReadTimeout: 10 * time.Second,
}
serveDone := make(chan struct{})
go func() {
defer close(serveDone)
err := srv.ListenAndServe()
if err != nil && !xerrors.Is(err, http.ErrServerClosed) {
t.Error("HTTP server error:", err)
}
}()
t.Cleanup(func() {
_ = srv.Close()
<-serveDone
})
}
type NGINXServerOptions struct {
SimpleServerOptions
}
var _ ServerStarter = NGINXServerOptions{}
func (o NGINXServerOptions) StartServer(t *testing.T, logger slog.Logger, listenAddr string) {
host, nginxPortStr, err := net.SplitHostPort(listenAddr)
require.NoError(t, err)
nginxPort, err := strconv.Atoi(nginxPortStr)
require.NoError(t, err)
serverPort := nginxPort + 1
serverListenAddr := net.JoinHostPort(host, strconv.Itoa(serverPort))
o.SimpleServerOptions.StartServer(t, logger, serverListenAddr)
startNginx(t, nginxPortStr, serverListenAddr)
}
func startNginx(t *testing.T, listenPort, serverAddr string) {
cfg := `events {}
http {
server {
listen ` + listenPort + `;
server_name _;
location / {
proxy_pass http://` + serverAddr + `;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header X-Forwarded-Host $server_name;
}
}
}
`
dir := t.TempDir()
cfgPath := filepath.Join(dir, "nginx.conf")
err := os.WriteFile(cfgPath, []byte(cfg), 0o600)
require.NoError(t, err)
// ExecBackground will handle cleanup.
_, _ = ExecBackground(t, "server.nginx", nil, "nginx", []string{"-c", cfgPath})
}
// StartClientDERP creates a client connection to the server for coordination
// and creates a tailnet.Conn which will only use DERP to connect to the peer.
func StartClientDERP(t *testing.T, logger slog.Logger, serverURL *url.URL, myID, peerID uuid.UUID) *tailnet.Conn {
@ -296,3 +401,126 @@ func basicDERPMap(t *testing.T, serverURL *url.URL) *tailcfg.DERPMap {
},
}
}
// ExecBackground starts a subprocess with the given flags and returns a
// channel that will receive the error when the subprocess exits. The returned
// function can be used to close the subprocess.
//
// processName is used to identify the subprocess in logs.
//
// Optionally, a network namespace can be passed to run the subprocess in.
//
// Do not call close then wait on the channel. Use the returned value from the
// function instead in this case.
//
// Cleanup is handled automatically if you don't care about monitoring the
// process manually.
func ExecBackground(t *testing.T, processName string, netNS *os.File, name string, args []string) (<-chan error, func() error) {
if netNS != nil {
// We use nsenter to enter the namespace.
// We can't use `setns` easily from Golang in the parent process because
// you can't execute the syscall in the forked child thread before it
// execs.
// We can't use `setns` easily from Golang in the child process because
// by the time you call it, the process has already created multiple
// threads.
args = append([]string{"--net=/proc/self/fd/3", name}, args...)
name = "nsenter"
}
cmd := exec.Command(name, args...)
if netNS != nil {
cmd.ExtraFiles = []*os.File{netNS}
}
out := &testWriter{
name: processName,
t: t,
}
t.Cleanup(out.Flush)
cmd.Stdout = out
cmd.Stderr = out
cmd.SysProcAttr = &syscall.SysProcAttr{
Pdeathsig: syscall.SIGTERM,
}
err := cmd.Start()
require.NoError(t, err)
waitErr := make(chan error, 1)
go func() {
err := cmd.Wait()
waitErr <- err
close(waitErr)
}()
closeFn := func() error {
_ = cmd.Process.Signal(syscall.SIGTERM)
select {
case <-time.After(5 * time.Second):
_ = cmd.Process.Kill()
case err := <-waitErr:
return err
}
return <-waitErr
}
t.Cleanup(func() {
select {
case err := <-waitErr:
if err != nil {
t.Logf("subprocess exited: " + err.Error())
}
return
default:
}
_ = closeFn()
})
return waitErr, closeFn
}
type testWriter struct {
mut sync.Mutex
name string
t *testing.T
capturedLines []string
}
func (w *testWriter) Write(p []byte) (n int, err error) {
w.mut.Lock()
defer w.mut.Unlock()
str := string(p)
split := strings.Split(str, "\n")
for _, s := range split {
if s == "" {
continue
}
// If a line begins with "\s*--- (PASS|FAIL)" or is just PASS or FAIL,
// then it's a test result line. We want to capture it and log it later.
trimmed := strings.TrimSpace(s)
if strings.HasPrefix(trimmed, "--- PASS") || strings.HasPrefix(trimmed, "--- FAIL") || trimmed == "PASS" || trimmed == "FAIL" {
// Also fail the test if we see a FAIL line.
if strings.Contains(trimmed, "FAIL") {
w.t.Errorf("subprocess logged test failure: %s: \t%s", w.name, s)
}
w.capturedLines = append(w.capturedLines, s)
continue
}
w.t.Logf("%s output: \t%s", w.name, s)
}
return len(p), nil
}
func (w *testWriter) Flush() {
w.mut.Lock()
defer w.mut.Unlock()
for _, s := range w.capturedLines {
w.t.Logf("%s output: \t%s", w.name, s)
}
w.capturedLines = nil
}

View File

@ -9,18 +9,14 @@ import (
"net/http"
"net/url"
"os"
"os/exec"
"os/signal"
"runtime"
"strings"
"sync"
"syscall"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
@ -72,7 +68,7 @@ var topologies = []integration.TestTopology{
// Test that DERP over loopback works.
Name: "BasicLoopbackDERP",
SetupNetworking: integration.SetupNetworkingLoopback,
ServerOptions: integration.ServerOptions{},
Server: integration.SimpleServerOptions{},
StartClient: integration.StartClientDERP,
RunTests: integration.TestSuite,
},
@ -82,7 +78,7 @@ var topologies = []integration.TestTopology{
// masquerades the traffic.
Name: "EasyNATDERP",
SetupNetworking: integration.SetupNetworkingEasyNAT,
ServerOptions: integration.ServerOptions{},
Server: integration.SimpleServerOptions{},
StartClient: integration.StartClientDERP,
RunTests: integration.TestSuite,
},
@ -92,7 +88,7 @@ var topologies = []integration.TestTopology{
// client 2.
Name: "EasyNATDirect",
SetupNetworking: integration.SetupNetworkingEasyNAT,
ServerOptions: integration.ServerOptions{},
Server: integration.SimpleServerOptions{},
StartClient: integration.StartClientDirect,
RunTests: integration.TestSuite,
},
@ -102,7 +98,7 @@ var topologies = []integration.TestTopology{
// automatic fallback.
Name: "DERPForceWebSockets",
SetupNetworking: integration.SetupNetworkingEasyNAT,
ServerOptions: integration.ServerOptions{
Server: integration.SimpleServerOptions{
FailUpgradeDERP: false,
DERPWebsocketOnly: true,
},
@ -113,7 +109,7 @@ var topologies = []integration.TestTopology{
// Test that falling back to DERP over WebSocket works.
Name: "DERPFallbackWebSockets",
SetupNetworking: integration.SetupNetworkingEasyNAT,
ServerOptions: integration.ServerOptions{
Server: integration.SimpleServerOptions{
FailUpgradeDERP: true,
DERPWebsocketOnly: false,
},
@ -121,6 +117,13 @@ var topologies = []integration.TestTopology{
StartClient: integration.StartClientDERP,
RunTests: integration.TestSuite,
},
{
Name: "BasicLoopbackDERPNGINX",
SetupNetworking: integration.SetupNetworkingLoopback,
Server: integration.NGINXServerOptions{},
StartClient: integration.StartClientDERP,
RunTests: integration.TestSuite,
},
}
//nolint:paralleltest,tparallel
@ -180,24 +183,7 @@ func handleTestSubprocess(t *testing.T) {
switch *role {
case "server":
logger = logger.Named("server")
srv := http.Server{
Addr: *serverListenAddr,
Handler: topo.ServerOptions.Router(t, logger),
ReadTimeout: 10 * time.Second,
}
serveDone := make(chan struct{})
go func() {
defer close(serveDone)
err := srv.ListenAndServe()
if err != nil && !xerrors.Is(err, http.ErrServerClosed) {
t.Error("HTTP server error:", err)
}
}()
t.Cleanup(func() {
_ = srv.Close()
<-serveDone
})
topo.Server.StartServer(t, logger, *serverListenAddr)
// no exit
case "client":
@ -303,122 +289,13 @@ func startClientSubprocess(t *testing.T, topologyName string, networking integra
return startSubprocess(t, clientName, netNS, flags)
}
// startSubprocess starts a subprocess with the given flags and returns a
// channel that will receive the error when the subprocess exits. The returned
// function can be used to close the subprocess.
// startSubprocess launches the test binary with the same flags as the test, but
// with additional flags added.
//
// Do not call close then wait on the channel. Use the returned value from the
// function instead in this case.
// See integration.ExecBackground for more details.
func startSubprocess(t *testing.T, processName string, netNS *os.File, flags []string) (<-chan error, func() error) {
name := os.Args[0]
// Always use verbose mode since it gets piped to the parent test anyways.
args := append(os.Args[1:], append([]string{"-test.v=true"}, flags...)...)
if netNS != nil {
// We use nsenter to enter the namespace.
// We can't use `setns` easily from Golang in the parent process because
// you can't execute the syscall in the forked child thread before it
// execs.
// We can't use `setns` easily from Golang in the child process because
// by the time you call it, the process has already created multiple
// threads.
args = append([]string{"--net=/proc/self/fd/3", name}, args...)
name = "nsenter"
}
cmd := exec.Command(name, args...)
if netNS != nil {
cmd.ExtraFiles = []*os.File{netNS}
}
out := &testWriter{
name: processName,
t: t,
}
t.Cleanup(out.Flush)
cmd.Stdout = out
cmd.Stderr = out
cmd.SysProcAttr = &syscall.SysProcAttr{
Pdeathsig: syscall.SIGTERM,
}
err := cmd.Start()
require.NoError(t, err)
waitErr := make(chan error, 1)
go func() {
err := cmd.Wait()
waitErr <- err
close(waitErr)
}()
closeFn := func() error {
_ = cmd.Process.Signal(syscall.SIGTERM)
select {
case <-time.After(5 * time.Second):
_ = cmd.Process.Kill()
case err := <-waitErr:
return err
}
return <-waitErr
}
t.Cleanup(func() {
select {
case err := <-waitErr:
if err != nil {
t.Logf("subprocess exited: " + err.Error())
}
return
default:
}
_ = closeFn()
})
return waitErr, closeFn
}
type testWriter struct {
mut sync.Mutex
name string
t *testing.T
capturedLines []string
}
func (w *testWriter) Write(p []byte) (n int, err error) {
w.mut.Lock()
defer w.mut.Unlock()
str := string(p)
split := strings.Split(str, "\n")
for _, s := range split {
if s == "" {
continue
}
// If a line begins with "\s*--- (PASS|FAIL)" or is just PASS or FAIL,
// then it's a test result line. We want to capture it and log it later.
trimmed := strings.TrimSpace(s)
if strings.HasPrefix(trimmed, "--- PASS") || strings.HasPrefix(trimmed, "--- FAIL") || trimmed == "PASS" || trimmed == "FAIL" {
// Also fail the test if we see a FAIL line.
if strings.Contains(trimmed, "FAIL") {
w.t.Errorf("subprocess logged test failure: %s: \t%s", w.name, s)
}
w.capturedLines = append(w.capturedLines, s)
continue
}
w.t.Logf("%s output: \t%s", w.name, s)
}
return len(p), nil
}
func (w *testWriter) Flush() {
w.mut.Lock()
defer w.mut.Unlock()
for _, s := range w.capturedLines {
w.t.Logf("%s output: \t%s", w.name, s)
}
w.capturedLines = nil
return integration.ExecBackground(t, processName, netNS, name, args)
}

View File

@ -6,12 +6,10 @@ package integration
import (
"bytes"
"fmt"
"net/url"
"os"
"os/exec"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/tailscale/netlink"
"golang.org/x/xerrors"
@ -19,29 +17,8 @@ import (
"cdr.dev/slog"
"github.com/coder/coder/v2/cryptorand"
"github.com/coder/coder/v2/tailnet"
)
type TestTopology struct {
Name string
// SetupNetworking creates interfaces and network namespaces for the test.
// The most simple implementation is NetworkSetupDefault, which only creates
// a network namespace shared for all tests.
SetupNetworking func(t *testing.T, logger slog.Logger) TestNetworking
// ServerOptions is the configuration for the server. It's passed to the
// server process.
ServerOptions ServerOptions
// StartClient gets called in each client subprocess. It's expected to
// create the tailnet.Conn and ensure connectivity to it's peer.
StartClient func(t *testing.T, logger slog.Logger, serverURL *url.URL, myID uuid.UUID, peerID uuid.UUID) *tailnet.Conn
// RunTests is the main test function. It's called in each of the client
// subprocesses. If tests can only run once, they should check the client ID
// and return early if it's not the expected one.
RunTests func(t *testing.T, logger slog.Logger, serverURL *url.URL, myID uuid.UUID, peerID uuid.UUID, conn *tailnet.Conn)
}
type TestNetworking struct {
// ServerListenAddr is the IP address and port that the server listens on,
// passed to StartServer.