fix: disable relay if built-in DERP is disabled (#12654)

Fixes https://github.com/coder/coder/issues/12493
This commit is contained in:
Colin Adler 2024-03-21 16:53:41 -05:00 committed by GitHub
parent d3c9aaf57b
commit 37a05372fa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 225 additions and 60 deletions

View File

@ -35,6 +35,8 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"gopkg.in/yaml.v3"
"tailscale.com/derp/derphttp"
"tailscale.com/types/key"
"cdr.dev/slog/sloggers/slogtest"
@ -1830,3 +1832,32 @@ func TestServer_InvalidDERP(t *testing.T) {
require.Error(t, err)
require.ErrorContains(t, err, "A valid DERP map is required for networking to work")
}
func TestServer_DisabledDERP(t *testing.T) {
t.Parallel()
ctx, cancelFunc := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancelFunc()
// Try to start a server with the built-in DERP server disabled and an
// external DERP map.
inv, cfg := clitest.New(t,
"server",
"--in-memory",
"--http-address", ":0",
"--access-url", "http://example.com",
"--derp-server-enable=false",
"--derp-config-url", "https://controlplane.tailscale.com/derpmap/default",
)
clitest.Start(t, inv.WithContext(ctx))
accessURL := waitAccessURL(t, cfg)
derpURL, err := accessURL.Parse("/derp")
require.NoError(t, err)
c, err := derphttp.NewClient(key.NewNode(), derpURL.String(), func(format string, args ...any) {})
require.NoError(t, err)
// DERP should fail to connect
err = c.Connect(ctx)
require.Error(t, err)
}

View File

@ -288,7 +288,7 @@ func New(options *Options) *API {
if options.PrometheusRegistry == nil {
options.PrometheusRegistry = prometheus.NewRegistry()
}
if options.DERPServer == nil {
if options.DERPServer == nil && options.DeploymentValues.DERP.Server.Enable {
options.DERPServer = derp.NewServer(key.NewNode(), tailnet.Logger(options.Logger.Named("derp")))
}
if options.DERPMapUpdateFrequency == 0 {
@ -577,8 +577,6 @@ func New(options *Options) *API {
// replicas or instances of this middleware.
apiRateLimiter := httpmw.RateLimit(options.APIRateLimit, time.Minute)
derpHandler := derphttp.Handler(api.DERPServer)
derpHandler, api.derpCloseFunc = tailnet.WithWebsocketSupport(api.DERPServer, derpHandler)
// Register DERP on expvar HTTP handler, which we serve below in the router, c.f. expvar.Handler()
// These are the metrics the DERP server exposes.
// TODO: export via prometheus
@ -587,7 +585,9 @@ func New(options *Options) *API {
// register multiple times. In production there is only one Coderd and one DERP server per
// process, but in testing, we create multiple of both, so the Once protects us from
// panicking.
expvar.Publish("derp", api.DERPServer.ExpVar())
if options.DERPServer != nil {
expvar.Publish("derp", api.DERPServer.ExpVar())
}
})
cors := httpmw.Cors(options.DeploymentValues.Dangerous.AllowAllCors.Value())
prometheusMW := httpmw.Prometheus(options.PrometheusRegistry)
@ -637,13 +637,18 @@ func New(options *Options) *API {
api.workspaceAppServer.Attach(r)
})
r.Route("/derp", func(r chi.Router) {
r.Get("/", derpHandler.ServeHTTP)
// This is used when UDP is blocked, and latency must be checked via HTTP(s).
r.Get("/latency-check", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
if options.DERPServer != nil {
derpHandler := derphttp.Handler(api.DERPServer)
derpHandler, api.derpCloseFunc = tailnet.WithWebsocketSupport(api.DERPServer, derpHandler)
r.Route("/derp", func(r chi.Router) {
r.Get("/", derpHandler.ServeHTTP)
// This is used when UDP is blocked, and latency must be checked via HTTP(s).
r.Get("/latency-check", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
})
})
}
// Register callback handlers for each OAuth2 provider.
// We must support gitauth and externalauth for backwards compatibility.
@ -1067,9 +1072,11 @@ func New(options *Options) *API {
r.Use(httpmw.ExtractUserParam(options.Database))
r.Get("/debug-link", api.userDebugOIDC)
})
r.Route("/derp", func(r chi.Router) {
r.Get("/traffic", options.DERPServer.ServeDebugTraffic)
})
if options.DERPServer != nil {
r.Route("/derp", func(r chi.Router) {
r.Get("/traffic", options.DERPServer.ServeDebugTraffic)
})
}
r.Method("GET", "/expvar", expvar.Handler()) // contains DERP metrics as well as cmdline and memstats
})
})
@ -1197,7 +1204,9 @@ type API struct {
// Close waits for all WebSocket connections to drain before returning.
func (api *API) Close() error {
api.cancel()
api.derpCloseFunc()
if api.derpCloseFunc != nil {
api.derpCloseFunc()
}
api.WebsocketWaitMutex.Lock()
api.WebsocketWaitGroup.Wait()

View File

@ -429,7 +429,11 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
if !options.DeploymentValues.DERP.Server.Enable.Value() {
region = nil
}
derpMap, err := tailnet.NewDERPMap(ctx, region, stunAddresses, "", "", options.DeploymentValues.DERP.Config.BlockDirect.Value())
derpMap, err := tailnet.NewDERPMap(ctx, region, stunAddresses,
options.DeploymentValues.DERP.Config.URL.Value(),
options.DeploymentValues.DERP.Config.Path.Value(),
options.DeploymentValues.DERP.Config.BlockDirect.Value(),
)
require.NoError(t, err)
return func(h http.Handler) {

View File

@ -37,41 +37,42 @@ func (r *RootCmd) Server(_ func()) *serpent.Command {
}
}
options.DERPServer = derp.NewServer(key.NewNode(), tailnet.Logger(options.Logger.Named("derp")))
if options.DeploymentValues.DERP.Server.Enable {
options.DERPServer = derp.NewServer(key.NewNode(), tailnet.Logger(options.Logger.Named("derp")))
var meshKey string
err := options.Database.InTx(func(tx database.Store) error {
// This will block until the lock is acquired, and will be
// automatically released when the transaction ends.
err := tx.AcquireLock(ctx, database.LockIDEnterpriseDeploymentSetup)
if err != nil {
return xerrors.Errorf("acquire lock: %w", err)
}
var meshKey string
err := options.Database.InTx(func(tx database.Store) error {
// This will block until the lock is acquired, and will be
// automatically released when the transaction ends.
err := tx.AcquireLock(ctx, database.LockIDEnterpriseDeploymentSetup)
if err != nil {
return xerrors.Errorf("acquire lock: %w", err)
}
meshKey, err = tx.GetDERPMeshKey(ctx)
if err == nil {
meshKey, err = tx.GetDERPMeshKey(ctx)
if err == nil {
return nil
}
if !errors.Is(err, sql.ErrNoRows) {
return xerrors.Errorf("get DERP mesh key: %w", err)
}
meshKey, err = cryptorand.String(32)
if err != nil {
return xerrors.Errorf("generate DERP mesh key: %w", err)
}
err = tx.InsertDERPMeshKey(ctx, meshKey)
if err != nil {
return xerrors.Errorf("insert DERP mesh key: %w", err)
}
return nil
}
if !errors.Is(err, sql.ErrNoRows) {
return xerrors.Errorf("get DERP mesh key: %w", err)
}
meshKey, err = cryptorand.String(32)
}, nil)
if err != nil {
return xerrors.Errorf("generate DERP mesh key: %w", err)
return nil, nil, err
}
err = tx.InsertDERPMeshKey(ctx, meshKey)
if err != nil {
return xerrors.Errorf("insert DERP mesh key: %w", err)
if meshKey == "" {
return nil, nil, xerrors.New("mesh key is empty")
}
return nil
}, nil)
if err != nil {
return nil, nil, err
options.DERPServer.SetMeshKey(meshKey)
}
if meshKey == "" {
return nil, nil, xerrors.New("mesh key is empty")
}
options.DERPServer.SetMeshKey(meshKey)
options.Auditor = audit.NewAuditor(
options.Database,

View File

@ -1,38 +1,43 @@
package cli_test
import (
"fmt"
"context"
"io"
"net/http"
"net/url"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/coder/coder/v2/cli/clitest"
"github.com/coder/coder/v2/cli/config"
"github.com/coder/coder/v2/enterprise/cli"
"github.com/coder/coder/v2/testutil"
)
// TestServer runs the enterprise server command
// and waits for /healthz to return "OK".
func TestServer(t *testing.T) {
func TestServer_Single(t *testing.T) {
t.Parallel()
ctx, cancelFunc := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancelFunc()
var root cli.RootCmd
cmd, err := root.Command(root.EnterpriseSubcommands())
require.NoError(t, err)
port := testutil.RandomPort(t)
inv, _ := clitest.NewWithCommand(t, cmd,
inv, cfg := clitest.NewWithCommand(t, cmd,
"server",
"--in-memory",
"--http-address", fmt.Sprintf(":%d", port),
"--http-address", ":0",
"--access-url", "http://example.com",
)
waiter := clitest.StartWithWaiter(t, inv)
clitest.Start(t, inv.WithContext(ctx))
accessURL := waitAccessURL(t, cfg)
require.Eventually(t, func() bool {
reqCtx := testutil.Context(t, testutil.IntervalMedium)
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, fmt.Sprintf("http://localhost:%d/healthz", port), nil)
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, accessURL.String()+"/healthz", nil)
if err != nil {
panic(err)
}
@ -48,5 +53,20 @@ func TestServer(t *testing.T) {
}
return assert.Equal(t, "OK", string(bs))
}, testutil.WaitShort, testutil.IntervalMedium)
waiter.Cancel()
}
func waitAccessURL(t *testing.T, cfg config.Root) *url.URL {
t.Helper()
var err error
var rawURL string
require.Eventually(t, func() bool {
rawURL, err = cfg.URL().Read()
return err == nil && rawURL != ""
}, testutil.WaitLong, testutil.IntervalFast, "failed to get access URL")
accessURL, err := url.Parse(rawURL)
require.NoError(t, err, "failed to parse access URL")
return accessURL
}

View File

@ -418,6 +418,8 @@ func New(ctx context.Context, options *Options) (_ *API, err error) {
if err != nil {
return nil, xerrors.Errorf("create DERP mesh TLS config: %w", err)
}
// We always want to run the replica manager even if we don't have DERP
// enabled, since it's used to detect other coder servers for licensing.
api.replicaManager, err = replicasync.New(ctx, options.Logger, options.Database, options.Pubsub, &replicasync.Options{
ID: api.AGPL.ID,
RelayAddress: options.DERPServerRelayAddress,
@ -428,7 +430,9 @@ func New(ctx context.Context, options *Options) (_ *API, err error) {
if err != nil {
return nil, xerrors.Errorf("initialize replica: %w", err)
}
api.derpMesh = derpmesh.New(options.Logger.Named("derpmesh"), api.DERPServer, meshTLSConfig)
if api.DERPServer != nil {
api.derpMesh = derpmesh.New(options.Logger.Named("derpmesh"), api.DERPServer, meshTLSConfig)
}
// Moon feature init. Proxyhealh is a go routine to periodically check
// the health of all workspace proxies.
@ -666,11 +670,18 @@ func (api *API) updateEntitlements(ctx context.Context) error {
}
api.replicaManager.SetCallback(func() {
addresses := make([]string, 0)
for _, replica := range api.replicaManager.Regional() {
addresses = append(addresses, replica.RelayAddress)
// Only update DERP mesh if the built-in server is enabled.
if api.Options.DeploymentValues.DERP.Server.Enable {
addresses := make([]string, 0)
for _, replica := range api.replicaManager.Regional() {
// Don't add replicas with an empty relay address.
if replica.RelayAddress == "" {
continue
}
addresses = append(addresses, replica.RelayAddress)
}
api.derpMesh.SetAddresses(addresses, false)
}
api.derpMesh.SetAddresses(addresses, false)
_ = api.updateEntitlements(ctx)
})
} else {

View File

@ -13,6 +13,8 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"cdr.dev/slog/sloggers/slogtest"
agplaudit "github.com/coder/coder/v2/coderd/audit"
"github.com/coder/coder/v2/coderd/coderdtest"
"github.com/coder/coder/v2/coderd/database"
@ -27,7 +29,10 @@ import (
"github.com/coder/coder/v2/enterprise/coderd/coderdenttest"
"github.com/coder/coder/v2/enterprise/coderd/license"
"github.com/coder/coder/v2/enterprise/dbcrypt"
"github.com/coder/coder/v2/enterprise/replicasync"
"github.com/coder/coder/v2/testutil"
"github.com/coder/retry"
"github.com/coder/serpent"
)
func TestMain(m *testing.M) {
@ -371,6 +376,83 @@ func TestExternalTokenEncryption(t *testing.T) {
})
}
func TestMultiReplica_EmptyRelayAddress(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitLong)
db, ps := dbtestutil.NewDB(t)
logger := slogtest.Make(t, nil)
_, _ = coderdenttest.New(t, &coderdenttest.Options{
EntitlementsUpdateInterval: 25 * time.Millisecond,
ReplicaSyncUpdateInterval: 25 * time.Millisecond,
Options: &coderdtest.Options{
Logger: &logger,
Database: db,
Pubsub: ps,
},
})
mgr, err := replicasync.New(ctx, logger, db, ps, &replicasync.Options{
ID: uuid.New(),
RelayAddress: "",
RegionID: 999,
UpdateInterval: testutil.IntervalFast,
})
require.NoError(t, err)
defer mgr.Close()
// Send a bunch of updates to see if the coderd will log errors.
{
ctx, cancel := context.WithTimeout(ctx, testutil.IntervalMedium)
for r := retry.New(testutil.IntervalFast, testutil.IntervalFast); r.Wait(ctx); {
require.NoError(t, mgr.PublishUpdate())
}
cancel()
}
}
func TestMultiReplica_EmptyRelayAddress_DisabledDERP(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitLong)
db, ps := dbtestutil.NewDB(t)
logger := slogtest.Make(t, nil)
dv := coderdtest.DeploymentValues(t)
dv.DERP.Server.Enable = serpent.Bool(false)
dv.DERP.Config.URL = serpent.String("https://controlplane.tailscale.com/derpmap/default")
_, _ = coderdenttest.New(t, &coderdenttest.Options{
EntitlementsUpdateInterval: 25 * time.Millisecond,
ReplicaSyncUpdateInterval: 25 * time.Millisecond,
Options: &coderdtest.Options{
Logger: &logger,
Database: db,
Pubsub: ps,
DeploymentValues: dv,
},
})
mgr, err := replicasync.New(ctx, logger, db, ps, &replicasync.Options{
ID: uuid.New(),
RelayAddress: "",
RegionID: 999,
UpdateInterval: testutil.IntervalFast,
})
require.NoError(t, err)
defer mgr.Close()
// Send a bunch of updates to see if the coderd will log errors.
{
ctx, cancel := context.WithTimeout(ctx, testutil.IntervalMedium)
for r := retry.New(testutil.IntervalFast, testutil.IntervalFast); r.Wait(ctx); {
require.NoError(t, mgr.PublishUpdate())
}
cancel()
}
}
// testDBAuthzRole returns a context with a subject that has a role
// with permissions required for test setup.
func testDBAuthzRole(ctx context.Context) context.Context {

View File

@ -17,7 +17,6 @@ import (
"golang.org/x/xerrors"
"cdr.dev/slog"
"github.com/coder/coder/v2/buildinfo"
"github.com/coder/coder/v2/cli/cliutil"
"github.com/coder/coder/v2/coderd/database"
@ -38,8 +37,9 @@ type Options struct {
TLSConfig *tls.Config
}
// New registers the replica with the database and periodically updates to ensure
// it's healthy. It contacts all other alive replicas to ensure they are reachable.
// New registers the replica with the database and periodically updates to
// ensure it's healthy. It contacts all other alive replicas to ensure they are
// reachable.
func New(ctx context.Context, logger slog.Logger, db database.Store, ps pubsub.Pubsub, options *Options) (*Manager, error) {
if options == nil {
options = &Options{}
@ -255,6 +255,13 @@ func (m *Manager) syncReplicas(ctx context.Context) error {
if replica.ID == m.id {
continue
}
// Don't peer with nodes that have an empty relay address.
if replica.RelayAddress == "" {
m.logger.Debug(ctx, "peer doesn't have an address, skipping",
slog.F("replica_hostname", replica.Hostname),
)
continue
}
m.peers = append(m.peers, replica)
}
m.mutex.Unlock()