feat: add endpoint to get listening ports in agent (#4260)

This commit is contained in:
Dean Sheather 2022-10-06 22:38:22 +10:00 committed by GitHub
parent bbe2baf3f6
commit 1386465631
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 501 additions and 1 deletions

View File

@ -10,6 +10,7 @@ import (
"fmt"
"io"
"net"
"net/http"
"net/netip"
"os"
"os/exec"
@ -206,6 +207,7 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
go a.sshServer.HandleConn(a.stats.wrapConn(conn))
}
}()
reconnectingPTYListener, err := a.network.Listen("tcp", ":"+strconv.Itoa(codersdk.TailnetReconnectingPTYPort))
if err != nil {
a.logger.Critical(ctx, "listen for reconnecting pty", slog.Error(err))
@ -240,6 +242,7 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
go a.handleReconnectingPTY(ctx, msg, conn)
}
}()
speedtestListener, err := a.network.Listen("tcp", ":"+strconv.Itoa(codersdk.TailnetSpeedtestPort))
if err != nil {
a.logger.Critical(ctx, "listen for speedtest", slog.Error(err))
@ -261,6 +264,31 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
}()
}
}()
statisticsListener, err := a.network.Listen("tcp", ":"+strconv.Itoa(codersdk.TailnetStatisticsPort))
if err != nil {
a.logger.Critical(ctx, "listen for statistics", slog.Error(err))
return
}
go func() {
defer statisticsListener.Close()
server := &http.Server{
Handler: a.statisticsHandler(),
ReadTimeout: 20 * time.Second,
ReadHeaderTimeout: 20 * time.Second,
WriteTimeout: 20 * time.Second,
ErrorLog: slog.Stdlib(ctx, a.logger.Named("statistics_http_server"), slog.LevelInfo),
}
go func() {
<-ctx.Done()
_ = server.Close()
}()
err = server.Serve(statisticsListener)
if err != nil && !xerrors.Is(err, http.ErrServerClosed) && !strings.Contains(err.Error(), "use of closed network connection") {
a.logger.Critical(ctx, "serve statistics HTTP server", slog.Error(err))
}
}()
}
// runCoordinator listens for nodes and updates the self-node as it changes.

65
agent/ports_supported.go Normal file
View File

@ -0,0 +1,65 @@
//go:build linux || windows
// +build linux windows
package agent
import (
"time"
"github.com/cakturk/go-netstat/netstat"
"golang.org/x/xerrors"
"github.com/coder/coder/codersdk"
)
func (lp *listeningPortsHandler) getListeningPorts() ([]codersdk.ListeningPort, error) {
lp.mut.Lock()
defer lp.mut.Unlock()
if time.Since(lp.mtime) < time.Second {
// copy
ports := make([]codersdk.ListeningPort, len(lp.ports))
copy(ports, lp.ports)
return ports, nil
}
tabs, err := netstat.TCPSocks(func(s *netstat.SockTabEntry) bool {
return s.State == netstat.Listen
})
if err != nil {
return nil, xerrors.Errorf("scan listening ports: %w", err)
}
seen := make(map[uint16]struct{}, len(tabs))
ports := []codersdk.ListeningPort{}
for _, tab := range tabs {
if tab.LocalAddr == nil || tab.LocalAddr.Port < uint16(codersdk.MinimumListeningPort) {
continue
}
// Don't include ports that we've already seen. This can happen on
// Windows, and maybe on Linux if you're using a shared listener socket.
if _, ok := seen[tab.LocalAddr.Port]; ok {
continue
}
seen[tab.LocalAddr.Port] = struct{}{}
procName := ""
if tab.Process != nil {
procName = tab.Process.Name
}
ports = append(ports, codersdk.ListeningPort{
ProcessName: procName,
Network: codersdk.ListeningPortNetworkTCP,
Port: tab.LocalAddr.Port,
})
}
lp.ports = ports
lp.mtime = time.Now()
// copy
ports = make([]codersdk.ListeningPort, len(lp.ports))
copy(ports, lp.ports)
return ports, nil
}

View File

@ -0,0 +1,13 @@
//go:build !linux && !windows
// +build !linux,!windows
package agent
import "github.com/coder/coder/codersdk"
func (lp *listeningPortsHandler) getListeningPorts() ([]codersdk.ListeningPort, error) {
// Can't scan for ports on non-linux or non-windows systems at the moment.
// The UI will not show any "no ports found" message to the user, so the
// user won't suspect a thing.
return []codersdk.ListeningPort{}, nil
}

49
agent/statsendpoint.go Normal file
View File

@ -0,0 +1,49 @@
package agent
import (
"net/http"
"sync"
"time"
"github.com/go-chi/chi"
"github.com/coder/coder/coderd/httpapi"
"github.com/coder/coder/codersdk"
)
func (*agent) statisticsHandler() http.Handler {
r := chi.NewRouter()
r.Get("/", func(rw http.ResponseWriter, r *http.Request) {
httpapi.Write(r.Context(), rw, http.StatusOK, codersdk.Response{
Message: "Hello from the agent!",
})
})
lp := &listeningPortsHandler{}
r.Get("/api/v0/listening-ports", lp.handler)
return r
}
type listeningPortsHandler struct {
mut sync.Mutex
ports []codersdk.ListeningPort
mtime time.Time
}
// handler returns a list of listening ports. This is tested by coderd's
// TestWorkspaceAgentListeningPorts test.
func (lp *listeningPortsHandler) handler(rw http.ResponseWriter, r *http.Request) {
ports, err := lp.getListeningPorts()
if err != nil {
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
Message: "Could not scan for listening ports.",
Detail: err.Error(),
})
return
}
httpapi.Write(r.Context(), rw, http.StatusOK, codersdk.ListeningPortsResponse{
Ports: ports,
})
}

View File

@ -438,6 +438,7 @@ func New(options *Options) *API {
)
r.Get("/", api.workspaceAgent)
r.Get("/pty", api.workspaceAgentPTY)
r.Get("/listening-ports", api.workspaceAgentListeningPorts)
r.Get("/connection", api.workspaceAgentConnection)
r.Get("/coordinate", api.workspaceAgentClientCoordinate)
// TODO: This can be removed in October. It allows for a friendly

View File

@ -219,6 +219,52 @@ func (api *API) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
_, _ = io.Copy(ptNetConn, wsNetConn)
}
func (api *API) workspaceAgentListeningPorts(rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()
workspace := httpmw.WorkspaceParam(r)
workspaceAgent := httpmw.WorkspaceAgentParam(r)
if !api.Authorize(r, rbac.ActionRead, workspace) {
httpapi.ResourceNotFound(rw)
return
}
apiAgent, err := convertWorkspaceAgent(api.DERPMap, api.TailnetCoordinator, workspaceAgent, nil, api.AgentInactiveDisconnectTimeout)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error reading workspace agent.",
Detail: err.Error(),
})
return
}
if apiAgent.Status != codersdk.WorkspaceAgentConnected {
httpapi.Write(ctx, rw, http.StatusPreconditionRequired, codersdk.Response{
Message: fmt.Sprintf("Agent state is %q, it must be in the %q state.", apiAgent.Status, codersdk.WorkspaceAgentConnected),
})
return
}
agentConn, release, err := api.workspaceAgentCache.Acquire(r, workspaceAgent.ID)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error dialing workspace agent.",
Detail: err.Error(),
})
return
}
defer release()
portsResponse, err := agentConn.ListeningPorts(ctx)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching listening ports.",
Detail: err.Error(),
})
return
}
httpapi.Write(ctx, rw, http.StatusOK, portsResponse)
}
func (api *API) dialWorkspaceAgentTailnet(r *http.Request, agentID uuid.UUID) (*codersdk.AgentConn, error) {
clientConn, serverConn := net.Pipe()
go func() {

View File

@ -4,7 +4,9 @@ import (
"bufio"
"context"
"encoding/json"
"net"
"runtime"
"strconv"
"strings"
"testing"
"time"
@ -363,6 +365,133 @@ func TestWorkspaceAgentPTY(t *testing.T) {
expectLine(matchEchoOutput)
}
func TestWorkspaceAgentListeningPorts(t *testing.T) {
t.Parallel()
client := coderdtest.New(t, &coderdtest.Options{
IncludeProvisionerDaemon: true,
})
coderdPort, err := strconv.Atoi(client.URL.Port())
require.NoError(t, err)
user := coderdtest.CreateFirstUser(t, client)
authToken := uuid.NewString()
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
Parse: echo.ParseComplete,
ProvisionDryRun: echo.ProvisionComplete,
Provision: []*proto.Provision_Response{{
Type: &proto.Provision_Response_Complete{
Complete: &proto.Provision_Complete{
Resources: []*proto.Resource{{
Name: "example",
Type: "aws_instance",
Agents: []*proto.Agent{{
Id: uuid.NewString(),
Auth: &proto.Agent_Token{
Token: authToken,
},
}},
}},
},
},
}},
})
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)
workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID)
coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)
agentClient := codersdk.New(client.URL)
agentClient.SessionToken = authToken
agentCloser := agent.New(agent.Options{
FetchMetadata: agentClient.WorkspaceAgentMetadata,
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug),
})
t.Cleanup(func() {
_ = agentCloser.Close()
})
resources := coderdtest.AwaitWorkspaceAgents(t, client, workspace.ID)
t.Run("LinuxAndWindows", func(t *testing.T) {
t.Parallel()
if runtime.GOOS != "linux" && runtime.GOOS != "windows" {
t.Skip("only runs on linux and windows")
return
}
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
// Create a TCP listener on a random port that we expect to see in the
// response.
l, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
defer l.Close()
tcpAddr, _ := l.Addr().(*net.TCPAddr)
// List ports and ensure that the port we expect to see is there.
res, err := client.WorkspaceAgentListeningPorts(ctx, resources[0].Agents[0].ID)
require.NoError(t, err)
var (
expected = map[uint16]bool{
// expect the listener we made
uint16(tcpAddr.Port): false,
// expect the coderdtest server
uint16(coderdPort): false,
}
)
for _, port := range res.Ports {
if port.Network == codersdk.ListeningPortNetworkTCP {
if val, ok := expected[port.Port]; ok {
if val {
t.Fatalf("expected to find TCP port %d only once in response", port.Port)
}
}
expected[port.Port] = true
}
}
for port, found := range expected {
if !found {
t.Fatalf("expected to find TCP port %d in response", port)
}
}
// Close the listener and check that the port is no longer in the response.
require.NoError(t, l.Close())
time.Sleep(2 * time.Second) // avoid cache
res, err = client.WorkspaceAgentListeningPorts(ctx, resources[0].Agents[0].ID)
require.NoError(t, err)
for _, port := range res.Ports {
if port.Network == codersdk.ListeningPortNetworkTCP && port.Port == uint16(tcpAddr.Port) {
t.Fatalf("expected to not find TCP port %d in response", tcpAddr.Port)
}
}
})
t.Run("Darwin", func(t *testing.T) {
t.Parallel()
if runtime.GOOS != "darwin" {
t.Skip("only runs on darwin")
return
}
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
// Create a TCP listener on a random port.
l, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
defer l.Close()
// List ports and ensure that the list is empty because we're on darwin.
res, err := client.WorkspaceAgentListeningPorts(ctx, resources[0].Agents[0].ID)
require.NoError(t, err)
require.Len(t, res.Ports, 0)
})
}
func TestWorkspaceAgentAppHealth(t *testing.T) {
t.Parallel()
client := coderdtest.New(t, &coderdtest.Options{

View File

@ -11,6 +11,7 @@ import (
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"strings"
"time"
@ -486,6 +487,27 @@ func (api *API) proxyWorkspaceApplication(proxyApp proxyApplication, rw http.Res
return
}
// Verify that the port is allowed. See the docs above
// `codersdk.MinimumListeningPort` for more details.
port := appURL.Port()
if port != "" {
portInt, err := strconv.Atoi(port)
if err != nil {
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
Message: fmt.Sprintf("App URL %q has an invalid port %q.", internalURL, port),
Detail: err.Error(),
})
return
}
if portInt < codersdk.MinimumListeningPort {
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
Message: fmt.Sprintf("Application port %d is not permitted. Coder reserves ports less than %d for internal use.", portInt, codersdk.MinimumListeningPort),
})
return
}
}
// Ensure path and query parameter correctness.
if proxyApp.Path == "" {
// Web applications typically request paths relative to the

View File

@ -692,4 +692,23 @@ func TestWorkspaceAppsProxySubdomain(t *testing.T) {
defer resp.Body.Close()
require.Equal(t, http.StatusBadGateway, resp.StatusCode)
})
t.Run("ProxyPortMinimumError", func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
port := uint16(codersdk.MinimumListeningPort - 1)
resp, err := client.Request(ctx, http.MethodGet, proxyURL(t, port, "/", proxyTestAppQuery), nil)
require.NoError(t, err)
defer resp.Body.Close()
// Should have an error response.
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
var resBody codersdk.Response
err = json.NewDecoder(resp.Body).Decode(&resBody)
require.NoError(t, err)
require.Contains(t, resBody.Message, "Coder reserves ports less than")
})
}

View File

@ -4,7 +4,10 @@ import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/netip"
"strconv"
"time"
@ -26,6 +29,20 @@ var (
TailnetSSHPort = 1
TailnetReconnectingPTYPort = 2
TailnetSpeedtestPort = 3
// TailnetStatisticsPort serves a HTTP server with endpoints for gathering
// agent statistics.
TailnetStatisticsPort = 4
// MinimumListeningPort is the minimum port that the listening-ports
// endpoint will return to the client, and the minimum port that is accepted
// by the proxy applications endpoint. Coder consumes ports 1-4 at the
// moment, and we reserve some extra ports for future use. Port 9 and up are
// available for the user.
//
// This is not enforced in the CLI intentionally as we don't really care
// *that* much. The user could bypass this in the CLI by using SSH instead
// anyways.
MinimumListeningPort = 9
)
// ReconnectingPTYRequest is sent from the client to the server
@ -153,3 +170,80 @@ func (c *AgentConn) DialContext(ctx context.Context, network string, addr string
}
return c.Conn.DialContextTCP(ctx, ipp)
}
func (c *AgentConn) statisticsClient() *http.Client {
return &http.Client{
Transport: &http.Transport{
// Disable keep alives as we're usually only making a single
// request, and this triggers goleak in tests
DisableKeepAlives: true,
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
if network != "tcp" {
return nil, xerrors.Errorf("network must be tcp")
}
host, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, xerrors.Errorf("split host port %q: %w", addr, err)
}
// Verify that host is TailnetIP and port is
// TailnetStatisticsPort.
if host != TailnetIP.String() || port != strconv.Itoa(TailnetStatisticsPort) {
return nil, xerrors.Errorf("request %q does not appear to be for statistics server", addr)
}
conn, err := c.DialContextTCP(context.Background(), netip.AddrPortFrom(TailnetIP, uint16(TailnetStatisticsPort)))
if err != nil {
return nil, xerrors.Errorf("dial statistics: %w", err)
}
return conn, nil
},
},
}
}
func (c *AgentConn) doStatisticsRequest(ctx context.Context, method, path string, body io.Reader) (*http.Response, error) {
host := net.JoinHostPort(TailnetIP.String(), strconv.Itoa(TailnetStatisticsPort))
url := fmt.Sprintf("http://%s%s", host, path)
req, err := http.NewRequestWithContext(ctx, method, url, body)
if err != nil {
return nil, xerrors.Errorf("new statistics server request to %q: %w", url, err)
}
return c.statisticsClient().Do(req)
}
type ListeningPortsResponse struct {
// If there are no ports in the list, nothing should be displayed in the UI.
// There must not be a "no ports available" message or anything similar, as
// there will always be no ports displayed on platforms where our port
// detection logic is unsupported.
Ports []ListeningPort `json:"ports"`
}
type ListeningPortNetwork string
const (
ListeningPortNetworkTCP ListeningPortNetwork = "tcp"
)
type ListeningPort struct {
ProcessName string `json:"process_name"` // may be empty
Network ListeningPortNetwork `json:"network"` // only "tcp" at the moment
Port uint16 `json:"port"`
}
func (c *AgentConn) ListeningPorts(ctx context.Context) (ListeningPortsResponse, error) {
res, err := c.doStatisticsRequest(ctx, http.MethodGet, "/api/v0/listening-ports", nil)
if err != nil {
return ListeningPortsResponse{}, xerrors.Errorf("do request: %w", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return ListeningPortsResponse{}, readBodyAsError(res)
}
var resp ListeningPortsResponse
return resp, json.NewDecoder(res.Body).Decode(&resp)
}

View File

@ -16,7 +16,7 @@ import (
// These cookies are Coder-specific. If a new one is added or changed, the name
// shouldn't be likely to conflict with any user-application set cookies.
// Be sure to strip additional cookies in httpapi.StripCoder Cookies!
// Be sure to strip additional cookies in httpapi.StripCoderCookies!
const (
// SessionTokenKey represents the name of the cookie or query parameter the API key is stored in.
SessionTokenKey = "coder_session_token"

View File

@ -520,6 +520,21 @@ func (c *Client) WorkspaceAgentReconnectingPTY(ctx context.Context, agentID, rec
return websocket.NetConn(ctx, conn, websocket.MessageBinary), nil
}
// WorkspaceAgentListeningPorts returns a list of ports that are currently being
// listened on inside the workspace agent's network namespace.
func (c *Client) WorkspaceAgentListeningPorts(ctx context.Context, agentID uuid.UUID) (ListeningPortsResponse, error) {
res, err := c.Request(ctx, http.MethodGet, fmt.Sprintf("/api/v2/workspaceagents/%s/listening-ports", agentID), nil)
if err != nil {
return ListeningPortsResponse{}, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return ListeningPortsResponse{}, readBodyAsError(res)
}
var listeningPorts ListeningPortsResponse
return listeningPorts, json.NewDecoder(res.Body).Decode(&listeningPorts)
}
// Stats records the Agent's network connection statistics for use in
// user-facing metrics and debugging.
// Each member value must be written and read with atomic.

2
go.mod
View File

@ -156,6 +156,8 @@ require (
tailscale.com v1.30.0
)
require github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 // indirect
require (
filippo.io/edwards25519 v1.0.0-rc.1 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect

2
go.sum
View File

@ -282,6 +282,8 @@ github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b/go.mod h1:obH5gd0Bsq
github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE=
github.com/butuzov/ireturn v0.1.1/go.mod h1:Wh6Zl3IMtTpaIKbmwzqi6olnM9ptYQxxVacMsOEFPoc=
github.com/bytecodealliance/wasmtime-go v0.36.0 h1:B6thr7RMM9xQmouBtUqm1RpkJjuLS37m6nxX+iwsQSc=
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 h1:BjkPE3785EwPhhyuFkbINB+2a1xATwk8SNDWnJiD41g=
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5/go.mod h1:jtAfVaU/2cu1+wdSRPWE2c1N2qeAA3K4RH9pYgqwets=
github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=

View File

@ -288,6 +288,18 @@ export interface License {
readonly claims: Record<string, any>
}
// From codersdk/agentconn.go
export interface ListeningPort {
readonly process_name: string
readonly network: ListeningPortNetwork
readonly port: number
}
// From codersdk/agentconn.go
export interface ListeningPortsResponse {
readonly ports: ListeningPort[]
}
// From codersdk/users.go
export interface LoginWithPasswordRequest {
readonly email: string
@ -680,6 +692,9 @@ export type BuildReason = "autostart" | "autostop" | "initiator"
// From codersdk/features.go
export type Entitlement = "entitled" | "grace_period" | "not_entitled"
// From codersdk/agentconn.go
export type ListeningPortNetwork = "tcp"
// From codersdk/provisionerdaemons.go
export type LogLevel = "debug" | "error" | "info" | "trace" | "warn"