mirror of https://github.com/coder/coder.git
feat: add derp mesh health checking in workspace proxies (#12222)
This commit is contained in:
parent
6b0b87eb27
commit
d2a5b31b2b
|
@ -244,7 +244,7 @@ func (r *RootCmd) proxyServer() *clibase.Cmd {
|
|||
closers.Add(closeFunc)
|
||||
}
|
||||
|
||||
proxy, err := wsproxy.New(ctx, &wsproxy.Options{
|
||||
options := &wsproxy.Options{
|
||||
Logger: logger,
|
||||
Experiments: coderd.ReadExperiments(logger, cfg.Experiments.Value()),
|
||||
HTTPClient: httpClient,
|
||||
|
@ -264,7 +264,12 @@ func (r *RootCmd) proxyServer() *clibase.Cmd {
|
|||
DERPOnly: derpOnly.Value(),
|
||||
BlockDirect: cfg.DERP.Config.BlockDirect.Value(),
|
||||
DERPServerRelayAddress: cfg.DERP.Server.RelayURL.String(),
|
||||
})
|
||||
}
|
||||
if httpServers.TLSConfig != nil {
|
||||
options.TLSCertificates = httpServers.TLSConfig.Certificates
|
||||
}
|
||||
|
||||
proxy, err := wsproxy.New(ctx, options)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("create workspace proxy: %w", err)
|
||||
}
|
||||
|
|
|
@ -3,8 +3,6 @@ package coderd
|
|||
import (
|
||||
"context"
|
||||
"crypto/ed25519"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"math"
|
||||
"net/http"
|
||||
|
@ -416,27 +414,9 @@ func New(ctx context.Context, options *Options) (_ *API, err error) {
|
|||
})
|
||||
}
|
||||
|
||||
meshRootCA := x509.NewCertPool()
|
||||
for _, certificate := range options.TLSCertificates {
|
||||
for _, certificatePart := range certificate.Certificate {
|
||||
certificate, err := x509.ParseCertificate(certificatePart)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("parse certificate %s: %w", certificate.Subject.CommonName, err)
|
||||
}
|
||||
meshRootCA.AddCert(certificate)
|
||||
}
|
||||
}
|
||||
// This TLS configuration spoofs access from the access URL hostname
|
||||
// assuming that the certificates provided will cover that hostname.
|
||||
//
|
||||
// Replica sync and DERP meshing require accessing replicas via their
|
||||
// internal IP addresses, and if TLS is configured we use the same
|
||||
// certificates.
|
||||
meshTLSConfig := &tls.Config{
|
||||
MinVersion: tls.VersionTLS12,
|
||||
Certificates: options.TLSCertificates,
|
||||
RootCAs: meshRootCA,
|
||||
ServerName: options.AccessURL.Hostname(),
|
||||
meshTLSConfig, err := replicasync.CreateDERPMeshTLSConfig(options.AccessURL.Hostname(), options.TLSCertificates)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("create DERP mesh TLS config: %w", err)
|
||||
}
|
||||
api.replicaManager, err = replicasync.New(ctx, options.Logger, options.Database, options.Pubsub, &replicasync.Options{
|
||||
ID: api.AGPL.ID,
|
||||
|
|
|
@ -44,6 +44,9 @@ type ProxyOptions struct {
|
|||
// region.
|
||||
Token string
|
||||
|
||||
// ReplicaPingCallback is optional.
|
||||
ReplicaPingCallback func(replicas []codersdk.Replica, err string)
|
||||
|
||||
// FlushStats is optional
|
||||
FlushStats chan chan<- struct{}
|
||||
}
|
||||
|
@ -158,6 +161,7 @@ func NewWorkspaceProxyReplica(t *testing.T, coderdAPI *coderd.API, owner *coders
|
|||
DERPEnabled: !options.DerpDisabled,
|
||||
DERPOnly: options.DerpOnly,
|
||||
DERPServerRelayAddress: serverURL.String(),
|
||||
ReplicaErrCallback: options.ReplicaPingCallback,
|
||||
StatsCollectorOptions: statsCollectorOptions,
|
||||
BlockDirect: options.BlockDirect,
|
||||
})
|
||||
|
|
|
@ -3,6 +3,7 @@ package replicasync
|
|||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
@ -265,45 +266,35 @@ func (m *Manager) syncReplicas(ctx context.Context) error {
|
|||
},
|
||||
}
|
||||
defer client.CloseIdleConnections()
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex
|
||||
failed := make([]string, 0)
|
||||
for _, peer := range m.Regional() {
|
||||
wg.Add(1)
|
||||
|
||||
peers := m.Regional()
|
||||
errs := make(chan error, len(peers))
|
||||
for _, peer := range peers {
|
||||
go func(peer database.Replica) {
|
||||
defer wg.Done()
|
||||
ra, err := url.Parse(peer.RelayAddress)
|
||||
err := PingPeerReplica(ctx, client, peer.RelayAddress)
|
||||
if err != nil {
|
||||
m.logger.Warn(ctx, "could not parse relay address",
|
||||
slog.F("relay_address", peer.RelayAddress), slog.Error(err))
|
||||
errs <- xerrors.Errorf("ping sibling replica %s (%s): %w", peer.Hostname, peer.RelayAddress, err)
|
||||
m.logger.Warn(ctx, "failed to ping sibling replica, this could happen if the replica has shutdown",
|
||||
slog.F("replica_hostname", peer.Hostname),
|
||||
slog.F("replica_relay_address", peer.RelayAddress),
|
||||
slog.Error(err),
|
||||
)
|
||||
return
|
||||
}
|
||||
target, err := ra.Parse("/derp/latency-check")
|
||||
if err != nil {
|
||||
m.logger.Warn(ctx, "could not resolve /derp/latency-check endpoint",
|
||||
slog.F("relay_address", peer.RelayAddress), slog.Error(err))
|
||||
return
|
||||
}
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, target.String(), nil)
|
||||
if err != nil {
|
||||
m.logger.Warn(ctx, "create http request for relay probe",
|
||||
slog.F("relay_address", peer.RelayAddress), slog.Error(err))
|
||||
return
|
||||
}
|
||||
res, err := client.Do(req)
|
||||
if err != nil {
|
||||
mu.Lock()
|
||||
failed = append(failed, fmt.Sprintf("relay %s (%s): %s", peer.Hostname, peer.RelayAddress, err))
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
_ = res.Body.Close()
|
||||
errs <- nil
|
||||
}(peer)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
replicaErrs := make([]string, 0, len(peers))
|
||||
for i := 0; i < len(peers); i++ {
|
||||
err := <-errs
|
||||
if err != nil {
|
||||
replicaErrs = append(replicaErrs, err.Error())
|
||||
}
|
||||
}
|
||||
replicaError := ""
|
||||
if len(failed) > 0 {
|
||||
replicaError = fmt.Sprintf("Failed to dial peers: %s", strings.Join(failed, ", "))
|
||||
if len(replicaErrs) > 0 {
|
||||
replicaError = fmt.Sprintf("Failed to dial peers: %s", strings.Join(replicaErrs, ", "))
|
||||
}
|
||||
|
||||
databaseLatency, err := m.db.Ping(ctx)
|
||||
|
@ -363,6 +354,32 @@ func (m *Manager) syncReplicas(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// PingPeerReplica pings a peer replica over it's internal relay address to
|
||||
// ensure it's reachable and alive for health purposes.
|
||||
func PingPeerReplica(ctx context.Context, client http.Client, relayAddress string) error {
|
||||
ra, err := url.Parse(relayAddress)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("parse relay address %q: %w", relayAddress, err)
|
||||
}
|
||||
target, err := ra.Parse("/derp/latency-check")
|
||||
if err != nil {
|
||||
return xerrors.Errorf("parse latency-check URL: %w", err)
|
||||
}
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, target.String(), nil)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("create request: %w", err)
|
||||
}
|
||||
res, err := client.Do(req)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("do probe: %w", err)
|
||||
}
|
||||
_ = res.Body.Close()
|
||||
if res.StatusCode != http.StatusOK {
|
||||
return xerrors.Errorf("unexpected status code: %d", res.StatusCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Self represents the current replica.
|
||||
func (m *Manager) Self() database.Replica {
|
||||
m.mutex.Lock()
|
||||
|
@ -466,3 +483,29 @@ func (m *Manager) Close() error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateDERPMeshTLSConfig creates a TLS configuration for connecting to peers
|
||||
// in the DERP mesh over private networking. It overrides the ServerName to be
|
||||
// the expected public hostname of the peer, and trusts all of the TLS server
|
||||
// certificates used by this replica (as we expect all replicas to use the same
|
||||
// TLS certificates).
|
||||
func CreateDERPMeshTLSConfig(hostname string, tlsCertificates []tls.Certificate) (*tls.Config, error) {
|
||||
meshRootCA := x509.NewCertPool()
|
||||
for _, certificate := range tlsCertificates {
|
||||
for _, certificatePart := range certificate.Certificate {
|
||||
parsedCert, err := x509.ParseCertificate(certificatePart)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("parse certificate %s: %w", parsedCert.Subject.CommonName, err)
|
||||
}
|
||||
meshRootCA.AddCert(parsedCert)
|
||||
}
|
||||
}
|
||||
|
||||
// This TLS configuration trusts the built-in TLS certificates and forces
|
||||
// the server name to be the public hostname.
|
||||
return &tls.Config{
|
||||
MinVersion: tls.VersionTLS12,
|
||||
RootCAs: meshRootCA,
|
||||
ServerName: hostname,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -286,7 +286,7 @@ func (d *derpyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
d.Add(1)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusUpgradeRequired)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func (d *derpyHandler) requireOnlyDERPPaths(t *testing.T) {
|
||||
|
|
|
@ -3,14 +3,15 @@ package wsproxy
|
|||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
|
@ -19,6 +20,7 @@ import (
|
|||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"golang.org/x/sync/singleflight"
|
||||
"golang.org/x/xerrors"
|
||||
"tailscale.com/derp"
|
||||
"tailscale.com/derp/derphttp"
|
||||
|
@ -35,6 +37,7 @@ import (
|
|||
"github.com/coder/coder/v2/coderd/workspaceapps"
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
"github.com/coder/coder/v2/enterprise/derpmesh"
|
||||
"github.com/coder/coder/v2/enterprise/replicasync"
|
||||
"github.com/coder/coder/v2/enterprise/wsproxy/wsproxysdk"
|
||||
"github.com/coder/coder/v2/site"
|
||||
"github.com/coder/coder/v2/tailnet"
|
||||
|
@ -79,6 +82,10 @@ type Options struct {
|
|||
// negotiating direct connections.
|
||||
BlockDirect bool
|
||||
|
||||
// ReplicaErrCallback is called when the proxy replica successfully or
|
||||
// unsuccessfully pings its peers in the mesh.
|
||||
ReplicaErrCallback func(replicas []codersdk.Replica, err string)
|
||||
|
||||
ProxySessionToken string
|
||||
// AllowAllCors will set all CORs headers to '*'.
|
||||
// By default, CORs is set to accept external requests
|
||||
|
@ -124,8 +131,12 @@ type Server struct {
|
|||
SDKClient *wsproxysdk.Client
|
||||
|
||||
// DERP
|
||||
derpMesh *derpmesh.Mesh
|
||||
latestDERPMap atomic.Pointer[tailcfg.DERPMap]
|
||||
derpMesh *derpmesh.Mesh
|
||||
derpMeshTLSConfig *tls.Config
|
||||
replicaPingSingleflight singleflight.Group
|
||||
replicaErrMut sync.Mutex
|
||||
replicaErr string
|
||||
latestDERPMap atomic.Pointer[tailcfg.DERPMap]
|
||||
|
||||
// Used for graceful shutdown. Required for the dialer.
|
||||
ctx context.Context
|
||||
|
@ -169,29 +180,10 @@ func New(ctx context.Context, opts *Options) (*Server, error) {
|
|||
return nil, xerrors.Errorf("%q is a workspace proxy, not a primary coderd instance", opts.DashboardURL)
|
||||
}
|
||||
|
||||
meshRootCA := x509.NewCertPool()
|
||||
for _, certificate := range opts.TLSCertificates {
|
||||
for _, certificatePart := range certificate.Certificate {
|
||||
certificate, err := x509.ParseCertificate(certificatePart)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("parse certificate %s: %w", certificate.Subject.CommonName, err)
|
||||
}
|
||||
meshRootCA.AddCert(certificate)
|
||||
}
|
||||
meshTLSConfig, err := replicasync.CreateDERPMeshTLSConfig(opts.AccessURL.Hostname(), opts.TLSCertificates)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("create DERP mesh tls config: %w", err)
|
||||
}
|
||||
// This TLS configuration spoofs access from the access URL hostname
|
||||
// assuming that the certificates provided will cover that hostname.
|
||||
//
|
||||
// Replica sync and DERP meshing require accessing replicas via their
|
||||
// internal IP addresses, and if TLS is configured we use the same
|
||||
// certificates.
|
||||
meshTLSConfig := &tls.Config{
|
||||
MinVersion: tls.VersionTLS12,
|
||||
Certificates: opts.TLSCertificates,
|
||||
RootCAs: meshRootCA,
|
||||
ServerName: opts.AccessURL.Hostname(),
|
||||
}
|
||||
|
||||
derpServer := derp.NewServer(key.NewNode(), tailnet.Logger(opts.Logger.Named("net.derp")))
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
@ -205,14 +197,13 @@ func New(ctx context.Context, opts *Options) (*Server, error) {
|
|||
PrometheusRegistry: opts.PrometheusRegistry,
|
||||
SDKClient: client,
|
||||
derpMesh: derpmesh.New(opts.Logger.Named("net.derpmesh"), derpServer, meshTLSConfig),
|
||||
derpMeshTLSConfig: meshTLSConfig,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
// Register the workspace proxy with the primary coderd instance and start a
|
||||
// goroutine to periodically re-register.
|
||||
replicaID := uuid.New()
|
||||
osHostname := cliutil.Hostname()
|
||||
registerLoop, regResp, err := client.RegisterWorkspaceProxyLoop(ctx, wsproxysdk.RegisterWorkspaceProxyLoopOpts{
|
||||
Logger: opts.Logger,
|
||||
Request: wsproxysdk.RegisterWorkspaceProxyRequest{
|
||||
|
@ -220,8 +211,8 @@ func New(ctx context.Context, opts *Options) (*Server, error) {
|
|||
WildcardHostname: opts.AppHostname,
|
||||
DerpEnabled: opts.DERPEnabled,
|
||||
DerpOnly: opts.DERPOnly,
|
||||
ReplicaID: replicaID,
|
||||
ReplicaHostname: osHostname,
|
||||
ReplicaID: uuid.New(),
|
||||
ReplicaHostname: cliutil.Hostname(),
|
||||
ReplicaError: "",
|
||||
ReplicaRelayAddress: opts.DERPServerRelayAddress,
|
||||
Version: buildinfo.Version(),
|
||||
|
@ -437,9 +428,10 @@ func (s *Server) Close() error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (*Server) mutateRegister(_ *wsproxysdk.RegisterWorkspaceProxyRequest) {
|
||||
// TODO: we should probably ping replicas similarly to the replicasync
|
||||
// package in the primary and update req.ReplicaError accordingly.
|
||||
func (s *Server) mutateRegister(req *wsproxysdk.RegisterWorkspaceProxyRequest) {
|
||||
s.replicaErrMut.Lock()
|
||||
defer s.replicaErrMut.Unlock()
|
||||
req.ReplicaError = s.replicaErr
|
||||
}
|
||||
|
||||
func (s *Server) handleRegister(res wsproxysdk.RegisterWorkspaceProxyResponse) error {
|
||||
|
@ -452,9 +444,82 @@ func (s *Server) handleRegister(res wsproxysdk.RegisterWorkspaceProxyResponse) e
|
|||
|
||||
s.latestDERPMap.Store(res.DERPMap)
|
||||
|
||||
go s.pingSiblingReplicas(res.SiblingReplicas)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) pingSiblingReplicas(replicas []codersdk.Replica) {
|
||||
if len(replicas) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Avoid pinging multiple times at once if the list hasn't changed.
|
||||
relayURLs := make([]string, len(replicas))
|
||||
for i, r := range replicas {
|
||||
relayURLs[i] = r.RelayAddress
|
||||
}
|
||||
slices.Sort(relayURLs)
|
||||
singleflightStr := strings.Join(relayURLs, " ") // URLs can't contain spaces.
|
||||
|
||||
//nolint:dogsled
|
||||
_, _, _ = s.replicaPingSingleflight.Do(singleflightStr, func() (any, error) {
|
||||
const (
|
||||
perReplicaTimeout = 3 * time.Second
|
||||
fullTimeout = 10 * time.Second
|
||||
)
|
||||
ctx, cancel := context.WithTimeout(s.ctx, fullTimeout)
|
||||
defer cancel()
|
||||
|
||||
client := http.Client{
|
||||
Timeout: perReplicaTimeout,
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: s.derpMeshTLSConfig,
|
||||
DisableKeepAlives: true,
|
||||
},
|
||||
}
|
||||
defer client.CloseIdleConnections()
|
||||
|
||||
errs := make(chan error, len(replicas))
|
||||
for _, peer := range replicas {
|
||||
go func(peer codersdk.Replica) {
|
||||
err := replicasync.PingPeerReplica(ctx, client, peer.RelayAddress)
|
||||
if err != nil {
|
||||
errs <- xerrors.Errorf("ping sibling replica %s (%s): %w", peer.Hostname, peer.RelayAddress, err)
|
||||
s.Logger.Warn(ctx, "failed to ping sibling replica, this could happen if the replica has shutdown",
|
||||
slog.F("replica_hostname", peer.Hostname),
|
||||
slog.F("replica_relay_address", peer.RelayAddress),
|
||||
slog.Error(err),
|
||||
)
|
||||
return
|
||||
}
|
||||
errs <- nil
|
||||
}(peer)
|
||||
}
|
||||
|
||||
replicaErrs := make([]string, 0, len(replicas))
|
||||
for i := 0; i < len(replicas); i++ {
|
||||
err := <-errs
|
||||
if err != nil {
|
||||
replicaErrs = append(replicaErrs, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
s.replicaErrMut.Lock()
|
||||
defer s.replicaErrMut.Unlock()
|
||||
s.replicaErr = ""
|
||||
if len(replicaErrs) > 0 {
|
||||
s.replicaErr = fmt.Sprintf("Failed to dial peers: %s", strings.Join(replicaErrs, ", "))
|
||||
}
|
||||
if s.Options.ReplicaErrCallback != nil {
|
||||
s.Options.ReplicaErrCallback(replicas, s.replicaErr)
|
||||
}
|
||||
|
||||
//nolint:nilnil // we don't actually use the return value of the
|
||||
// singleflight here
|
||||
return nil, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Server) handleRegisterFailure(err error) {
|
||||
if s.ctx.Err() != nil {
|
||||
return
|
||||
|
@ -523,8 +588,14 @@ func (s *Server) healthReport(rw http.ResponseWriter, r *http.Request) {
|
|||
fmt.Sprintf("version mismatch: primary coderd (%s) != workspace proxy (%s)", primaryBuild.Version, buildinfo.Version()))
|
||||
}
|
||||
|
||||
s.replicaErrMut.Lock()
|
||||
if s.replicaErr != "" {
|
||||
report.Errors = append(report.Errors, "High availability networking: it appears you are running more than one replica of the proxy, but the replicas are unable to establish a mesh for networking: "+s.replicaErr)
|
||||
}
|
||||
s.replicaErrMut.Unlock()
|
||||
|
||||
// TODO: We should hit the deployment config endpoint and do some config
|
||||
// checks. We can check the version from the X-CODER-BUILD-VERSION header
|
||||
// checks.
|
||||
|
||||
httpapi.Write(r.Context(), rw, http.StatusOK, report)
|
||||
}
|
||||
|
|
|
@ -2,8 +2,11 @@ package wsproxy_test
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -20,6 +23,7 @@ import (
|
|||
"cdr.dev/slog"
|
||||
"cdr.dev/slog/sloggers/slogtest"
|
||||
"github.com/coder/coder/v2/agent/agenttest"
|
||||
"github.com/coder/coder/v2/buildinfo"
|
||||
"github.com/coder/coder/v2/cli/clibase"
|
||||
"github.com/coder/coder/v2/coderd/coderdtest"
|
||||
"github.com/coder/coder/v2/coderd/healthcheck/derphealth"
|
||||
|
@ -29,6 +33,7 @@ import (
|
|||
"github.com/coder/coder/v2/cryptorand"
|
||||
"github.com/coder/coder/v2/enterprise/coderd/coderdenttest"
|
||||
"github.com/coder/coder/v2/enterprise/coderd/license"
|
||||
"github.com/coder/coder/v2/enterprise/wsproxy/wsproxysdk"
|
||||
"github.com/coder/coder/v2/provisioner/echo"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
)
|
||||
|
@ -128,21 +133,20 @@ func TestDERP(t *testing.T) {
|
|||
})
|
||||
|
||||
// Create a proxy that is never started.
|
||||
createProxyCtx := testutil.Context(t, testutil.WaitLong)
|
||||
_, err := client.CreateWorkspaceProxy(createProxyCtx, codersdk.CreateWorkspaceProxyRequest{
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
_, err := client.CreateWorkspaceProxy(ctx, codersdk.CreateWorkspaceProxyRequest{
|
||||
Name: "never-started-proxy",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait for both running proxies to become healthy.
|
||||
require.Eventually(t, func() bool {
|
||||
healthCtx := testutil.Context(t, testutil.WaitLong)
|
||||
err := api.ProxyHealth.ForceUpdate(healthCtx)
|
||||
err := api.ProxyHealth.ForceUpdate(ctx)
|
||||
if !assert.NoError(t, err) {
|
||||
return false
|
||||
}
|
||||
|
||||
regions, err := client.Regions(healthCtx)
|
||||
regions, err := client.Regions(ctx)
|
||||
if !assert.NoError(t, err) {
|
||||
return false
|
||||
}
|
||||
|
@ -264,7 +268,8 @@ resourceLoop:
|
|||
t.Run("ConnectDERP", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
connInfo, err := client.WorkspaceAgentConnectionInfo(testutil.Context(t, testutil.WaitLong), agentID)
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
connInfo, err := client.WorkspaceAgentConnectionInfo(ctx, agentID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, connInfo.DERPMap)
|
||||
require.Len(t, connInfo.DERPMap.Regions, 3+len(api.DeploymentValues.DERP.Server.STUNAddresses.Value()))
|
||||
|
@ -287,7 +292,6 @@ resourceLoop:
|
|||
OmitDefaultRegions: true,
|
||||
}
|
||||
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
report := derphealth.Report{}
|
||||
report.Run(ctx, &derphealth.ReportOptions{
|
||||
DERPMap: derpMap,
|
||||
|
@ -350,14 +354,14 @@ func TestDERPEndToEnd(t *testing.T) {
|
|||
})
|
||||
|
||||
// Wait for the proxy to become healthy.
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
require.Eventually(t, func() bool {
|
||||
healthCtx := testutil.Context(t, testutil.WaitLong)
|
||||
err := api.ProxyHealth.ForceUpdate(healthCtx)
|
||||
err := api.ProxyHealth.ForceUpdate(ctx)
|
||||
if !assert.NoError(t, err) {
|
||||
return false
|
||||
}
|
||||
|
||||
regions, err := client.Regions(healthCtx)
|
||||
regions, err := client.Regions(ctx)
|
||||
if !assert.NoError(t, err) {
|
||||
return false
|
||||
}
|
||||
|
@ -425,7 +429,6 @@ resourceLoop:
|
|||
_ = coderdtest.AwaitWorkspaceAgents(t, client, workspace.ID)
|
||||
|
||||
// Connect to the workspace agent.
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
conn, err := client.DialWorkspaceAgent(ctx, agentID, &codersdk.DialWorkspaceAgentOptions{
|
||||
Logger: slogtest.Make(t, &slogtest.Options{
|
||||
IgnoreErrors: true,
|
||||
|
@ -546,6 +549,243 @@ func TestDERPMesh(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestWorkspaceProxyDERPMeshProbe ensures that each replica pings every other
|
||||
// replica in the same region as itself periodically.
|
||||
func TestWorkspaceProxyDERPMeshProbe(t *testing.T) {
|
||||
t.Parallel()
|
||||
createProxyRegion := func(ctx context.Context, t *testing.T, client *codersdk.Client, name string) codersdk.UpdateWorkspaceProxyResponse {
|
||||
t.Helper()
|
||||
proxyRes, err := client.CreateWorkspaceProxy(ctx, codersdk.CreateWorkspaceProxyRequest{
|
||||
Name: name,
|
||||
Icon: "/emojis/flag.png",
|
||||
})
|
||||
require.NoError(t, err, "failed to create workspace proxy")
|
||||
return proxyRes
|
||||
}
|
||||
|
||||
registerBrokenProxy := func(ctx context.Context, t *testing.T, primaryAccessURL *url.URL, accessURL, token string) {
|
||||
t.Helper()
|
||||
// Create a HTTP server that always replies with 500.
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
||||
rw.WriteHeader(http.StatusInternalServerError)
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
// Register a proxy.
|
||||
wsproxyClient := wsproxysdk.New(primaryAccessURL)
|
||||
wsproxyClient.SetSessionToken(token)
|
||||
|
||||
hostname, err := cryptorand.String(6)
|
||||
require.NoError(t, err)
|
||||
_, err = wsproxyClient.RegisterWorkspaceProxy(ctx, wsproxysdk.RegisterWorkspaceProxyRequest{
|
||||
AccessURL: accessURL,
|
||||
WildcardHostname: "",
|
||||
DerpEnabled: true,
|
||||
DerpOnly: false,
|
||||
ReplicaID: uuid.New(),
|
||||
ReplicaHostname: hostname,
|
||||
ReplicaError: "",
|
||||
ReplicaRelayAddress: srv.URL,
|
||||
Version: buildinfo.Version(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
t.Run("ProbeOK", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
deploymentValues := coderdtest.DeploymentValues(t)
|
||||
deploymentValues.Experiments = []string{
|
||||
"*",
|
||||
}
|
||||
|
||||
client, closer, api, _ := coderdenttest.NewWithAPI(t, &coderdenttest.Options{
|
||||
Options: &coderdtest.Options{
|
||||
DeploymentValues: deploymentValues,
|
||||
AppHostname: "*.primary.test.coder.com",
|
||||
IncludeProvisionerDaemon: true,
|
||||
RealIPConfig: &httpmw.RealIPConfig{
|
||||
TrustedOrigins: []*net.IPNet{{
|
||||
IP: net.ParseIP("127.0.0.1"),
|
||||
Mask: net.CIDRMask(8, 32),
|
||||
}},
|
||||
TrustedHeaders: []string{
|
||||
"CF-Connecting-IP",
|
||||
},
|
||||
},
|
||||
},
|
||||
LicenseOptions: &coderdenttest.LicenseOptions{
|
||||
Features: license.Features{
|
||||
codersdk.FeatureWorkspaceProxy: 1,
|
||||
},
|
||||
},
|
||||
})
|
||||
t.Cleanup(func() {
|
||||
_ = closer.Close()
|
||||
})
|
||||
|
||||
// Register but don't start a proxy in a different region. This
|
||||
// shouldn't affect the mesh since it's in a different region.
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
fakeProxyRes := createProxyRegion(ctx, t, client, "fake-proxy")
|
||||
registerBrokenProxy(ctx, t, api.AccessURL, "https://fake-proxy.test.coder.com", fakeProxyRes.ProxyToken)
|
||||
|
||||
proxyURL, err := url.Parse("https://proxy1.test.coder.com")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create 6 proxy replicas.
|
||||
const count = 6
|
||||
var (
|
||||
sessionToken = ""
|
||||
proxies = [count]coderdenttest.WorkspaceProxy{}
|
||||
replicaPingDone = [count]bool{}
|
||||
)
|
||||
for i := range proxies {
|
||||
i := i
|
||||
proxies[i] = coderdenttest.NewWorkspaceProxyReplica(t, api, client, &coderdenttest.ProxyOptions{
|
||||
Name: "proxy-1",
|
||||
Token: sessionToken,
|
||||
ProxyURL: proxyURL,
|
||||
ReplicaPingCallback: func(replicas []codersdk.Replica, err string) {
|
||||
if len(replicas) != count-1 {
|
||||
// Still warming up...
|
||||
return
|
||||
}
|
||||
replicaPingDone[i] = true
|
||||
assert.Emptyf(t, err, "replica %d ping callback error", i)
|
||||
},
|
||||
})
|
||||
if i == 0 {
|
||||
sessionToken = proxies[i].Options.ProxySessionToken
|
||||
}
|
||||
}
|
||||
|
||||
// Force all proxies to re-register immediately. This ensures the DERP
|
||||
// mesh is up-to-date. In production this will happen automatically
|
||||
// after about 15 seconds.
|
||||
for i, proxy := range proxies {
|
||||
err := proxy.RegisterNow()
|
||||
require.NoErrorf(t, err, "failed to force proxy %d to re-register", i)
|
||||
}
|
||||
|
||||
// Ensure that all proxies have pinged.
|
||||
require.Eventually(t, func() bool {
|
||||
ok := true
|
||||
for i := range proxies {
|
||||
if !replicaPingDone[i] {
|
||||
t.Logf("replica %d has not pinged yet", i)
|
||||
ok = false
|
||||
}
|
||||
}
|
||||
return ok
|
||||
}, testutil.WaitLong, testutil.IntervalSlow)
|
||||
t.Log("all replicas have pinged")
|
||||
|
||||
// Check they're all healthy according to /healthz-report.
|
||||
for _, proxy := range proxies {
|
||||
// GET /healthz-report
|
||||
u := proxy.ServerURL.ResolveReference(&url.URL{Path: "/healthz-report"})
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
|
||||
require.NoError(t, err)
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
require.NoError(t, err)
|
||||
|
||||
var respJSON codersdk.ProxyHealthReport
|
||||
err = json.NewDecoder(resp.Body).Decode(&respJSON)
|
||||
resp.Body.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Empty(t, respJSON.Errors, "proxy is not healthy")
|
||||
}
|
||||
})
|
||||
|
||||
// Register one proxy, then pretend to register 5 others. This should cause
|
||||
// the mesh to fail and return an error.
|
||||
t.Run("ProbeFail", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
deploymentValues := coderdtest.DeploymentValues(t)
|
||||
deploymentValues.Experiments = []string{
|
||||
"*",
|
||||
}
|
||||
|
||||
client, closer, api, _ := coderdenttest.NewWithAPI(t, &coderdenttest.Options{
|
||||
Options: &coderdtest.Options{
|
||||
DeploymentValues: deploymentValues,
|
||||
AppHostname: "*.primary.test.coder.com",
|
||||
IncludeProvisionerDaemon: true,
|
||||
RealIPConfig: &httpmw.RealIPConfig{
|
||||
TrustedOrigins: []*net.IPNet{{
|
||||
IP: net.ParseIP("127.0.0.1"),
|
||||
Mask: net.CIDRMask(8, 32),
|
||||
}},
|
||||
TrustedHeaders: []string{
|
||||
"CF-Connecting-IP",
|
||||
},
|
||||
},
|
||||
},
|
||||
LicenseOptions: &coderdenttest.LicenseOptions{
|
||||
Features: license.Features{
|
||||
codersdk.FeatureWorkspaceProxy: 1,
|
||||
},
|
||||
},
|
||||
})
|
||||
t.Cleanup(func() {
|
||||
_ = closer.Close()
|
||||
})
|
||||
|
||||
proxyURL, err := url.Parse("https://proxy2.test.coder.com")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create 1 real proxy replica.
|
||||
const fakeCount = 5
|
||||
replicaPingErr := make(chan string, 4)
|
||||
proxy := coderdenttest.NewWorkspaceProxyReplica(t, api, client, &coderdenttest.ProxyOptions{
|
||||
Name: "proxy-2",
|
||||
ProxyURL: proxyURL,
|
||||
ReplicaPingCallback: func(replicas []codersdk.Replica, err string) {
|
||||
if len(replicas) != fakeCount {
|
||||
// Still warming up...
|
||||
return
|
||||
}
|
||||
replicaPingErr <- err
|
||||
},
|
||||
})
|
||||
|
||||
// Register (but don't start wsproxy.Server) 5 other proxies in the same
|
||||
// region. Since they registered recently they should be included in the
|
||||
// mesh. We create a HTTP server on the relay address that always
|
||||
// responds with 500 so probes fail.
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
for i := 0; i < fakeCount; i++ {
|
||||
registerBrokenProxy(ctx, t, api.AccessURL, proxyURL.String(), proxy.Options.ProxySessionToken)
|
||||
}
|
||||
|
||||
// Force the proxy to re-register immediately.
|
||||
err = proxy.RegisterNow()
|
||||
require.NoError(t, err, "failed to force proxy to re-register")
|
||||
|
||||
// Wait for the ping to fail.
|
||||
replicaErr := testutil.RequireRecvCtx(ctx, t, replicaPingErr)
|
||||
require.NotEmpty(t, replicaErr, "replica ping error")
|
||||
|
||||
// GET /healthz-report
|
||||
u := proxy.ServerURL.ResolveReference(&url.URL{Path: "/healthz-report"})
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
|
||||
require.NoError(t, err)
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
require.NoError(t, err)
|
||||
|
||||
var respJSON codersdk.ProxyHealthReport
|
||||
err = json.NewDecoder(resp.Body).Decode(&respJSON)
|
||||
resp.Body.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, respJSON.Errors, 1, "proxy is healthy")
|
||||
require.Contains(t, respJSON.Errors[0], "High availability networking")
|
||||
})
|
||||
}
|
||||
|
||||
func TestWorkspaceProxyWorkspaceApps(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
|
Loading…
Reference in New Issue