feat(cli): add trafficgen command for load testing (#7307)

This PR adds a scaletest workspace-traffic command for load testing. This opens a
ReconnectingPTY connection to each scaletest workspace (via coderd) and 
concurrently writes and reads random data to/from the PTY. Payloads are of the
form #${RANDOM_ALPHANUMERIC_STRING}, which essentially drops garbage
comments in the remote shell, and should not result in any commands being executed.
This commit is contained in:
Cian Johnston 2023-05-05 10:34:58 +01:00 committed by GitHub
parent a172e073e3
commit 08fb9a6f1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 787 additions and 79 deletions

View File

@ -87,11 +87,13 @@ func (r *RootCmd) Core() []*clibase.Cmd {
// Workspace Commands
r.configSSH(),
r.rename(),
r.ping(),
r.create(),
r.deleteWorkspace(),
r.list(),
r.parameters(),
r.ping(),
r.rename(),
r.scaletest(),
r.schedules(),
r.show(),
r.speedtest(),
@ -100,13 +102,11 @@ func (r *RootCmd) Core() []*clibase.Cmd {
r.stop(),
r.update(),
r.restart(),
r.parameters(),
// Hidden
r.workspaceAgent(),
r.scaletest(),
r.gitssh(),
r.vscodeSSH(),
r.workspaceAgent(),
}
}

View File

@ -28,6 +28,7 @@ import (
"github.com/coder/coder/scaletest/harness"
"github.com/coder/coder/scaletest/reconnectingpty"
"github.com/coder/coder/scaletest/workspacebuild"
"github.com/coder/coder/scaletest/workspacetraffic"
)
const scaletestTracerName = "coder_scaletest"
@ -42,6 +43,7 @@ func (r *RootCmd) scaletest() *clibase.Cmd {
Children: []*clibase.Cmd{
r.scaletestCleanup(),
r.scaletestCreateWorkspaces(),
r.scaletestWorkspaceTraffic(),
},
}
@ -107,7 +109,10 @@ func (s *scaletestTracingFlags) provider(ctx context.Context) (trace.TracerProvi
return tracerProvider, func(ctx context.Context) error {
var err error
closeTracingOnce.Do(func() {
err = closeTracing(ctx)
// Allow time to upload traces even if ctx is canceled
traceCtx, traceCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer traceCancel()
err = closeTracing(traceCtx)
})
return err
@ -384,33 +389,9 @@ func (r *RootCmd) scaletestCleanup() *clibase.Cmd {
}
cliui.Infof(inv.Stdout, "Fetching scaletest workspaces...")
var (
pageNumber = 0
limit = 100
workspaces []codersdk.Workspace
)
for {
page, err := client.Workspaces(ctx, codersdk.WorkspaceFilter{
Name: "scaletest-",
Offset: pageNumber * limit,
Limit: limit,
})
if err != nil {
return xerrors.Errorf("fetch scaletest workspaces page %d: %w", pageNumber, err)
}
pageNumber++
if len(page.Workspaces) == 0 {
break
}
pageWorkspaces := make([]codersdk.Workspace, 0, len(page.Workspaces))
for _, w := range page.Workspaces {
if isScaleTestWorkspace(w) {
pageWorkspaces = append(pageWorkspaces, w)
}
}
workspaces = append(workspaces, pageWorkspaces...)
workspaces, err := getScaletestWorkspaces(ctx, client)
if err != nil {
return err
}
cliui.Errorf(inv.Stderr, "Found %d scaletest workspaces\n", len(workspaces))
@ -441,33 +422,9 @@ func (r *RootCmd) scaletestCleanup() *clibase.Cmd {
}
cliui.Infof(inv.Stdout, "Fetching scaletest users...")
pageNumber = 0
limit = 100
var users []codersdk.User
for {
page, err := client.Users(ctx, codersdk.UsersRequest{
Search: "scaletest-",
Pagination: codersdk.Pagination{
Offset: pageNumber * limit,
Limit: limit,
},
})
if err != nil {
return xerrors.Errorf("fetch scaletest users page %d: %w", pageNumber, err)
}
pageNumber++
if len(page.Users) == 0 {
break
}
pageUsers := make([]codersdk.User, 0, len(page.Users))
for _, u := range page.Users {
if isScaleTestUser(u) {
pageUsers = append(pageUsers, u)
}
}
users = append(users, pageUsers...)
users, err := getScaletestUsers(ctx, client)
if err != nil {
return err
}
cliui.Errorf(inv.Stderr, "Found %d scaletest users\n", len(users))
@ -683,10 +640,11 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd {
}
defer func() {
// Allow time for traces to flush even if command context is
// canceled.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_ = closeTracing(ctx)
// canceled. This is a no-op if tracing is not enabled.
_, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...")
if err := closeTracing(ctx); err != nil {
_, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err)
}
}()
tracer := tracerProvider.Tracer(scaletestTracerName)
@ -800,17 +758,6 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd {
return xerrors.Errorf("cleanup tests: %w", err)
}
// Upload traces.
if tracingEnabled {
_, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...")
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
err := closeTracing(ctx)
if err != nil {
_, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err)
}
}
if res.TotalFail > 0 {
return xerrors.New("load test failed, see above for more details")
}
@ -947,6 +894,156 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd {
return cmd
}
func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
var (
tickInterval time.Duration
bytesPerTick int64
client = &codersdk.Client{}
tracingFlags = &scaletestTracingFlags{}
strategy = &scaletestStrategyFlags{}
cleanupStrategy = &scaletestStrategyFlags{cleanup: true}
output = &scaletestOutputFlags{}
)
cmd := &clibase.Cmd{
Use: "workspace-traffic",
Short: "Generate traffic to scaletest workspaces through coderd",
Middleware: clibase.Chain(
r.InitClient(client),
),
Handler: func(inv *clibase.Invocation) error {
ctx := inv.Context()
// Bypass rate limiting
client.HTTPClient = &http.Client{
Transport: &headerTransport{
transport: http.DefaultTransport,
header: map[string][]string{
codersdk.BypassRatelimitHeader: {"true"},
},
},
}
workspaces, err := getScaletestWorkspaces(inv.Context(), client)
if err != nil {
return err
}
if len(workspaces) == 0 {
return xerrors.Errorf("no scaletest workspaces exist")
}
tracerProvider, closeTracing, tracingEnabled, err := tracingFlags.provider(ctx)
if err != nil {
return xerrors.Errorf("create tracer provider: %w", err)
}
defer func() {
// Allow time for traces to flush even if command context is
// canceled. This is a no-op if tracing is not enabled.
_, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...")
if err := closeTracing(ctx); err != nil {
_, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err)
}
}()
tracer := tracerProvider.Tracer(scaletestTracerName)
outputs, err := output.parse()
if err != nil {
return xerrors.Errorf("could not parse --output flags")
}
th := harness.NewTestHarness(strategy.toStrategy(), cleanupStrategy.toStrategy())
for idx, ws := range workspaces {
var (
agentID uuid.UUID
name = "workspace-traffic"
id = strconv.Itoa(idx)
)
for _, res := range ws.LatestBuild.Resources {
if len(res.Agents) == 0 {
continue
}
agentID = res.Agents[0].ID
}
if agentID == uuid.Nil {
_, _ = fmt.Fprintf(inv.Stderr, "WARN: skipping workspace %s: no agent\n", ws.Name)
continue
}
// Setup our workspace agent connection.
config := workspacetraffic.Config{
AgentID: agentID,
BytesPerTick: bytesPerTick,
Duration: strategy.timeout,
TickInterval: tickInterval,
}
if err := config.Validate(); err != nil {
return xerrors.Errorf("validate config: %w", err)
}
var runner harness.Runnable = workspacetraffic.NewRunner(client, config)
if tracingEnabled {
runner = &runnableTraceWrapper{
tracer: tracer,
spanName: fmt.Sprintf("%s/%s", name, id),
runner: runner,
}
}
th.AddRun(name, id, runner)
}
_, _ = fmt.Fprintln(inv.Stderr, "Running load test...")
testCtx, testCancel := strategy.toContext(ctx)
defer testCancel()
err = th.Run(testCtx)
if err != nil {
return xerrors.Errorf("run test harness (harness failure, not a test failure): %w", err)
}
res := th.Results()
for _, o := range outputs {
err = o.write(res, inv.Stdout)
if err != nil {
return xerrors.Errorf("write output %q to %q: %w", o.format, o.path, err)
}
}
if res.TotalFail > 0 {
return xerrors.New("load test failed, see above for more details")
}
return nil
},
}
cmd.Options = []clibase.Option{
{
Flag: "bytes-per-tick",
Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_BYTES_PER_TICK",
Default: "1024",
Description: "How much traffic to generate per tick.",
Value: clibase.Int64Of(&bytesPerTick),
},
{
Flag: "tick-interval",
Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_TICK_INTERVAL",
Default: "100ms",
Description: "How often to send traffic.",
Value: clibase.DurationOf(&tickInterval),
},
}
tracingFlags.attach(&cmd.Options)
strategy.attach(&cmd.Options)
cleanupStrategy.attach(&cmd.Options)
output.attach(&cmd.Options)
return cmd
}
type runnableTraceWrapper struct {
tracer trace.Tracer
spanName string
@ -1023,3 +1120,72 @@ func isScaleTestWorkspace(workspace codersdk.Workspace) bool {
return strings.HasPrefix(workspace.OwnerName, "scaletest-") ||
strings.HasPrefix(workspace.Name, "scaletest-")
}
func getScaletestWorkspaces(ctx context.Context, client *codersdk.Client) ([]codersdk.Workspace, error) {
var (
pageNumber = 0
limit = 100
workspaces []codersdk.Workspace
)
for {
page, err := client.Workspaces(ctx, codersdk.WorkspaceFilter{
Name: "scaletest-",
Offset: pageNumber * limit,
Limit: limit,
})
if err != nil {
return nil, xerrors.Errorf("fetch scaletest workspaces page %d: %w", pageNumber, err)
}
pageNumber++
if len(page.Workspaces) == 0 {
break
}
pageWorkspaces := make([]codersdk.Workspace, 0, len(page.Workspaces))
for _, w := range page.Workspaces {
if isScaleTestWorkspace(w) {
pageWorkspaces = append(pageWorkspaces, w)
}
}
workspaces = append(workspaces, pageWorkspaces...)
}
return workspaces, nil
}
func getScaletestUsers(ctx context.Context, client *codersdk.Client) ([]codersdk.User, error) {
var (
pageNumber = 0
limit = 100
users []codersdk.User
)
for {
page, err := client.Users(ctx, codersdk.UsersRequest{
Search: "scaletest-",
Pagination: codersdk.Pagination{
Offset: pageNumber * limit,
Limit: limit,
},
})
if err != nil {
return nil, xerrors.Errorf("fetch scaletest users page %d: %w", pageNumber, err)
}
pageNumber++
if len(page.Users) == 0 {
break
}
pageUsers := make([]codersdk.User, 0, len(page.Users))
for _, u := range page.Users {
if isScaleTestUser(u) {
pageUsers = append(pageUsers, u)
}
}
users = append(users, pageUsers...)
}
return users, nil
}

View File

@ -1,24 +1,30 @@
package cli_test
import (
"bytes"
"context"
"encoding/json"
"os"
"path/filepath"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/coder/coder/agent"
"github.com/coder/coder/cli/clitest"
"github.com/coder/coder/coderd/coderdtest"
"github.com/coder/coder/codersdk"
"github.com/coder/coder/codersdk/agentsdk"
"github.com/coder/coder/provisioner/echo"
"github.com/coder/coder/provisionersdk/proto"
"github.com/coder/coder/pty/ptytest"
"github.com/coder/coder/scaletest/harness"
"github.com/coder/coder/testutil"
)
func TestScaleTest(t *testing.T) {
func TestScaleTestCreateWorkspaces(t *testing.T) {
t.Skipf("This test is flakey. See https://github.com/coder/coder/issues/4942")
t.Parallel()
@ -198,3 +204,71 @@ param3: 1
require.Len(t, users.Users, 1)
})
}
// This test pretends to stand up a workspace and run a no-op traffic generation test.
// It's not a real test, but it's useful for debugging.
// We do not perform any cleanup.
func TestScaleTestWorkspaceTraffic(t *testing.T) {
t.Parallel()
ctx, cancelFunc := context.WithTimeout(context.Background(), testutil.WaitMedium)
defer cancelFunc()
client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerDaemon: true})
user := coderdtest.CreateFirstUser(t, client)
authToken := uuid.NewString()
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
Parse: echo.ParseComplete,
ProvisionPlan: echo.ProvisionComplete,
ProvisionApply: []*proto.Provision_Response{{
Type: &proto.Provision_Response_Complete{
Complete: &proto.Provision_Complete{
Resources: []*proto.Resource{{
Name: "example",
Type: "aws_instance",
Agents: []*proto.Agent{{
Id: uuid.NewString(),
Name: "agent",
Auth: &proto.Agent_Token{
Token: authToken,
},
Apps: []*proto.App{},
}},
}},
},
},
}},
})
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)
ws := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID, func(cwr *codersdk.CreateWorkspaceRequest) {
cwr.Name = "scaletest-test"
})
coderdtest.AwaitWorkspaceBuildJob(t, client, ws.LatestBuild.ID)
agentClient := agentsdk.New(client.URL)
agentClient.SetSessionToken(authToken)
agentCloser := agent.New(agent.Options{
Client: agentClient,
})
t.Cleanup(func() {
_ = agentCloser.Close()
})
coderdtest.AwaitWorkspaceAgents(t, client, ws.ID)
inv, root := clitest.New(t, "scaletest", "workspace-traffic",
"--timeout", "1s",
"--bytes-per-tick", "1024",
"--tick-interval", "100ms",
)
clitest.SetupConfig(t, client, root)
var stdout, stderr bytes.Buffer
inv.Stdout = &stdout
inv.Stderr = &stderr
err := inv.WithContext(ctx).Run()
require.NoError(t, err)
require.Contains(t, stdout.String(), "Pass: 1")
}

View File

@ -10,6 +10,7 @@ Run a scale test against the Coder API
online. Optionally runs a command inside each
workspace, and connects to the workspace over
WireGuard.
workspace-traffic Generate traffic to scaletest workspaces through coderd
---
Run `coder --help` for a list of global options.

View File

@ -0,0 +1,55 @@
Usage: coder scaletest workspace-traffic [flags]
Generate traffic to scaletest workspaces through coderd
Options
--bytes-per-tick int, $CODER_SCALETEST_WORKSPACE_TRAFFIC_BYTES_PER_TICK (default: 1024)
How much traffic to generate per tick.
--cleanup-concurrency int, $CODER_SCALETEST_CLEANUP_CONCURRENCY (default: 1)
Number of concurrent cleanup jobs to run. 0 means unlimited.
--cleanup-job-timeout duration, $CODER_SCALETEST_CLEANUP_JOB_TIMEOUT (default: 5m)
Timeout per job. Jobs may take longer to complete under higher
concurrency limits.
--cleanup-timeout duration, $CODER_SCALETEST_CLEANUP_TIMEOUT (default: 30m)
Timeout for the entire cleanup run. 0 means unlimited.
--concurrency int, $CODER_SCALETEST_CONCURRENCY (default: 1)
Number of concurrent jobs to run. 0 means unlimited.
--job-timeout duration, $CODER_SCALETEST_JOB_TIMEOUT (default: 5m)
Timeout per job. Jobs may take longer to complete under higher
concurrency limits.
--output string-array, $CODER_SCALETEST_OUTPUTS (default: text)
Output format specs in the format "<format>[:<path>]". Not specifying
a path will default to stdout. Available formats: text, json.
--tick-interval duration, $CODER_SCALETEST_WORKSPACE_TRAFFIC_TICK_INTERVAL (default: 100ms)
How often to send traffic.
--timeout duration, $CODER_SCALETEST_TIMEOUT (default: 30m)
Timeout for the entire test run. 0 means unlimited.
--trace bool, $CODER_SCALETEST_TRACE
Whether application tracing data is collected. It exports to a backend
configured by environment variables. See:
https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md.
--trace-coder bool, $CODER_SCALETEST_TRACE_CODER
Whether opentelemetry traces are sent to Coder. We recommend keeping
this disabled unless we advise you to enable it.
--trace-honeycomb-api-key string, $CODER_SCALETEST_TRACE_HONEYCOMB_API_KEY
Enables trace exporting to Honeycomb.io using the provided API key.
--trace-propagate bool, $CODER_SCALETEST_TRACE_PROPAGATE
Enables trace propagation to the Coder backend, which will be used to
correlate server-side spans with client-side spans. Only enable this
if the server is configured with the exact same tracing configuration
as the client.
---
Run `coder --help` for a list of global options.

View File

@ -165,9 +165,9 @@ type WorkspaceAgentReconnectingPTYInit struct {
// to pipe data to a PTY.
// @typescript-ignore ReconnectingPTYRequest
type ReconnectingPTYRequest struct {
Data string `json:"data"`
Height uint16 `json:"height"`
Width uint16 `json:"width"`
Data string `json:"data,omitempty"`
Height uint16 `json:"height,omitempty"`
Width uint16 `json:"width,omitempty"`
}
// ReconnectingPTY spawns a new reconnecting terminal session.

View File

@ -16,3 +16,4 @@ coder scaletest
| ------------------------------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| [<code>cleanup</code>](./scaletest_cleanup.md) | Cleanup scaletest workspaces, then cleanup scaletest users. |
| [<code>create-workspaces</code>](./scaletest_create-workspaces.md) | Creates many users, then creates a workspace for each user and waits for them finish building and fully come online. Optionally runs a command inside each workspace, and connects to the workspace over WireGuard. |
| [<code>workspace-traffic</code>](./scaletest_workspace-traffic.md) | Generate traffic to scaletest workspaces through coderd |

View File

@ -0,0 +1,139 @@
<!-- DO NOT EDIT | GENERATED CONTENT -->
# scaletest workspace-traffic
Generate traffic to scaletest workspaces through coderd
## Usage
```console
coder scaletest workspace-traffic [flags]
```
## Options
### --bytes-per-tick
| | |
| ----------- | -------------------------------------------------------------- |
| Type | <code>int</code> |
| Environment | <code>$CODER_SCALETEST_WORKSPACE_TRAFFIC_BYTES_PER_TICK</code> |
| Default | <code>1024</code> |
How much traffic to generate per tick.
### --cleanup-concurrency
| | |
| ----------- | ------------------------------------------------- |
| Type | <code>int</code> |
| Environment | <code>$CODER_SCALETEST_CLEANUP_CONCURRENCY</code> |
| Default | <code>1</code> |
Number of concurrent cleanup jobs to run. 0 means unlimited.
### --cleanup-job-timeout
| | |
| ----------- | ------------------------------------------------- |
| Type | <code>duration</code> |
| Environment | <code>$CODER_SCALETEST_CLEANUP_JOB_TIMEOUT</code> |
| Default | <code>5m</code> |
Timeout per job. Jobs may take longer to complete under higher concurrency limits.
### --cleanup-timeout
| | |
| ----------- | --------------------------------------------- |
| Type | <code>duration</code> |
| Environment | <code>$CODER_SCALETEST_CLEANUP_TIMEOUT</code> |
| Default | <code>30m</code> |
Timeout for the entire cleanup run. 0 means unlimited.
### --concurrency
| | |
| ----------- | ----------------------------------------- |
| Type | <code>int</code> |
| Environment | <code>$CODER_SCALETEST_CONCURRENCY</code> |
| Default | <code>1</code> |
Number of concurrent jobs to run. 0 means unlimited.
### --job-timeout
| | |
| ----------- | ----------------------------------------- |
| Type | <code>duration</code> |
| Environment | <code>$CODER_SCALETEST_JOB_TIMEOUT</code> |
| Default | <code>5m</code> |
Timeout per job. Jobs may take longer to complete under higher concurrency limits.
### --output
| | |
| ----------- | ------------------------------------- |
| Type | <code>string-array</code> |
| Environment | <code>$CODER_SCALETEST_OUTPUTS</code> |
| Default | <code>text</code> |
Output format specs in the format "<format>[:<path>]". Not specifying a path will default to stdout. Available formats: text, json.
### --tick-interval
| | |
| ----------- | ------------------------------------------------------------- |
| Type | <code>duration</code> |
| Environment | <code>$CODER_SCALETEST_WORKSPACE_TRAFFIC_TICK_INTERVAL</code> |
| Default | <code>100ms</code> |
How often to send traffic.
### --timeout
| | |
| ----------- | ------------------------------------- |
| Type | <code>duration</code> |
| Environment | <code>$CODER_SCALETEST_TIMEOUT</code> |
| Default | <code>30m</code> |
Timeout for the entire test run. 0 means unlimited.
### --trace
| | |
| ----------- | ----------------------------------- |
| Type | <code>bool</code> |
| Environment | <code>$CODER_SCALETEST_TRACE</code> |
Whether application tracing data is collected. It exports to a backend configured by environment variables. See: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md.
### --trace-coder
| | |
| ----------- | ----------------------------------------- |
| Type | <code>bool</code> |
| Environment | <code>$CODER_SCALETEST_TRACE_CODER</code> |
Whether opentelemetry traces are sent to Coder. We recommend keeping this disabled unless we advise you to enable it.
### --trace-honeycomb-api-key
| | |
| ----------- | ----------------------------------------------------- |
| Type | <code>string</code> |
| Environment | <code>$CODER_SCALETEST_TRACE_HONEYCOMB_API_KEY</code> |
Enables trace exporting to Honeycomb.io using the provided API key.
### --trace-propagate
| | |
| ----------- | --------------------------------------------- |
| Type | <code>bool</code> |
| Environment | <code>$CODER_SCALETEST_TRACE_PROPAGATE</code> |
Enables trace propagation to the Coder backend, which will be used to correlate server-side spans with client-side spans. Only enable this if the server is configured with the exact same tracing configuration as the client.

View File

@ -637,6 +637,11 @@
"description": "Creates many users, then creates a workspace for each user and waits for them finish building and fully come online. Optionally runs a command inside each workspace, and connects to the workspace over WireGuard.",
"path": "cli/scaletest_create-workspaces.md"
},
{
"title": "scaletest workspace-traffic",
"description": "Generate traffic to scaletest workspaces through coderd",
"path": "cli/scaletest_workspace-traffic.md"
},
{
"title": "schedule",
"description": "Schedule automated start and stop times for workspaces",

View File

@ -0,0 +1,43 @@
package workspacetraffic
import (
"time"
"github.com/google/uuid"
"golang.org/x/xerrors"
)
type Config struct {
// AgentID is the workspace agent ID to which to connect.
AgentID uuid.UUID `json:"agent_id"`
// BytesPerTick is the number of bytes to send to the agent per tick.
BytesPerTick int64 `json:"bytes_per_tick"`
// Duration is the total duration for which to send traffic to the agent.
Duration time.Duration `json:"duration"`
// TickInterval specifies the interval between ticks (that is, attempts to
// send data to workspace agents).
TickInterval time.Duration `json:"tick_interval"`
}
func (c Config) Validate() error {
if c.AgentID == uuid.Nil {
return xerrors.Errorf("validate agent_id: must not be nil")
}
if c.BytesPerTick <= 0 {
return xerrors.Errorf("validate bytes_per_tick: must be greater than zero")
}
if c.Duration <= 0 {
return xerrors.Errorf("validate duration: must be greater than zero")
}
if c.TickInterval <= 0 {
return xerrors.Errorf("validate tick_interval: must be greater than zero")
}
return nil
}

View File

@ -0,0 +1,224 @@
package workspacetraffic
import (
"context"
"encoding/json"
"io"
"sync/atomic"
"time"
"github.com/google/uuid"
"golang.org/x/xerrors"
"nhooyr.io/websocket"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/sloghuman"
"github.com/coder/coder/coderd/tracing"
"github.com/coder/coder/codersdk"
"github.com/coder/coder/cryptorand"
"github.com/coder/coder/scaletest/harness"
"github.com/coder/coder/scaletest/loadtestutil"
)
type Runner struct {
client *codersdk.Client
cfg Config
}
var (
_ harness.Runnable = &Runner{}
_ harness.Cleanable = &Runner{}
)
func NewRunner(client *codersdk.Client, cfg Config) *Runner {
return &Runner{
client: client,
cfg: cfg,
}
}
func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error {
ctx, span := tracing.StartSpan(ctx)
defer span.End()
logs = loadtestutil.NewSyncWriter(logs)
logger := slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug)
r.client.Logger = logger
r.client.LogBodies = true
var (
agentID = r.cfg.AgentID
reconnect = uuid.New()
height uint16 = 25
width uint16 = 80
tickInterval = r.cfg.TickInterval
bytesPerTick = r.cfg.BytesPerTick
)
logger.Info(ctx, "config",
slog.F("agent_id", agentID),
slog.F("reconnect", reconnect),
slog.F("height", height),
slog.F("width", width),
slog.F("tick_interval", tickInterval),
slog.F("bytes_per_tick", bytesPerTick),
)
// Set a deadline for stopping the text.
start := time.Now()
deadlineCtx, cancel := context.WithDeadline(ctx, start.Add(r.cfg.Duration))
defer cancel()
logger.Debug(ctx, "connect to workspace agent", slog.F("agent_id", agentID))
conn, err := r.client.WorkspaceAgentReconnectingPTY(ctx, codersdk.WorkspaceAgentReconnectingPTYOpts{
AgentID: agentID,
Reconnect: reconnect,
Height: height,
Width: width,
Command: "/bin/sh",
})
if err != nil {
logger.Error(ctx, "connect to workspace agent", slog.F("agent_id", agentID), slog.Error(err))
return xerrors.Errorf("connect to workspace: %w", err)
}
go func() {
<-deadlineCtx.Done()
logger.Debug(ctx, "close agent connection", slog.F("agent_id", agentID))
_ = conn.Close()
}()
// Wrap the conn in a countReadWriter so we can monitor bytes sent/rcvd.
crw := countReadWriter{ReadWriter: conn}
// Create a ticker for sending data to the PTY.
tick := time.NewTicker(tickInterval)
defer tick.Stop()
// Now we begin writing random data to the pty.
rch := make(chan error, 1)
wch := make(chan error, 1)
go func() {
<-deadlineCtx.Done()
logger.Debug(ctx, "closing agent connection")
conn.Close()
}()
// Read forever in the background.
go func() {
logger.Debug(ctx, "reading from agent", slog.F("agent_id", agentID))
rch <- drain(&crw)
logger.Debug(ctx, "done reading from agent", slog.F("agent_id", agentID))
close(rch)
}()
// Write random data to the PTY every tick.
go func() {
logger.Debug(ctx, "writing to agent", slog.F("agent_id", agentID))
wch <- writeRandomData(&crw, bytesPerTick, tick.C)
logger.Debug(ctx, "done writing to agent", slog.F("agent_id", agentID))
close(wch)
}()
// Write until the context is canceled.
if wErr := <-wch; wErr != nil {
return xerrors.Errorf("write to pty: %w", wErr)
}
if rErr := <-rch; rErr != nil {
return xerrors.Errorf("read from pty: %w", rErr)
}
duration := time.Since(start)
logger.Info(ctx, "results",
slog.F("duration", duration),
slog.F("sent", crw.BytesWritten()),
slog.F("rcvd", crw.BytesRead()),
)
return nil
}
// Cleanup does nothing, successfully.
func (*Runner) Cleanup(context.Context, string) error {
return nil
}
// drain drains from src until it returns io.EOF or ctx times out.
func drain(src io.Reader) error {
if _, err := io.Copy(io.Discard, src); err != nil {
if xerrors.Is(err, context.DeadlineExceeded) {
return nil
}
if xerrors.As(err, &websocket.CloseError{}) {
return nil
}
return err
}
return nil
}
func writeRandomData(dst io.Writer, size int64, tick <-chan time.Time) error {
var (
enc = json.NewEncoder(dst)
ptyReq = codersdk.ReconnectingPTYRequest{}
)
for range tick {
payload := "#" + mustRandStr(size-1)
ptyReq.Data = payload
if err := enc.Encode(ptyReq); err != nil {
if xerrors.Is(err, context.DeadlineExceeded) {
return nil
}
if xerrors.As(err, &websocket.CloseError{}) {
return nil
}
return err
}
}
return nil
}
// countReadWriter wraps an io.ReadWriter and counts the number of bytes read and written.
type countReadWriter struct {
io.ReadWriter
bytesRead atomic.Int64
bytesWritten atomic.Int64
}
func (w *countReadWriter) Read(p []byte) (int, error) {
n, err := w.ReadWriter.Read(p)
if err == nil {
w.bytesRead.Add(int64(n))
}
return n, err
}
func (w *countReadWriter) Write(p []byte) (int, error) {
n, err := w.ReadWriter.Write(p)
if err == nil {
w.bytesWritten.Add(int64(n))
}
return n, err
}
func (w *countReadWriter) BytesRead() int64 {
return w.bytesRead.Load()
}
func (w *countReadWriter) BytesWritten() int64 {
return w.bytesWritten.Load()
}
func mustRandStr(l int64) string {
if l < 1 {
l = 1
}
randStr, err := cryptorand.String(int(l))
if err != nil {
panic(err)
}
return randStr
}