From 08fb9a6f1bd082a203743a31836413ed8e6ed2dd Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Fri, 5 May 2023 10:34:58 +0100 Subject: [PATCH] feat(cli): add trafficgen command for load testing (#7307) This PR adds a scaletest workspace-traffic command for load testing. This opens a ReconnectingPTY connection to each scaletest workspace (via coderd) and concurrently writes and reads random data to/from the PTY. Payloads are of the form #${RANDOM_ALPHANUMERIC_STRING}, which essentially drops garbage comments in the remote shell, and should not result in any commands being executed. --- cli/root.go | 10 +- cli/scaletest.go | 306 ++++++++++++++---- cli/scaletest_test.go | 76 ++++- cli/testdata/coder_scaletest_--help.golden | 1 + ..._scaletest_workspace-traffic_--help.golden | 55 ++++ codersdk/workspaceagentconn.go | 6 +- docs/cli/scaletest.md | 1 + docs/cli/scaletest_workspace-traffic.md | 139 ++++++++ docs/manifest.json | 5 + scaletest/workspacetraffic/config.go | 43 +++ scaletest/workspacetraffic/run.go | 224 +++++++++++++ 11 files changed, 787 insertions(+), 79 deletions(-) create mode 100644 cli/testdata/coder_scaletest_workspace-traffic_--help.golden create mode 100644 docs/cli/scaletest_workspace-traffic.md create mode 100644 scaletest/workspacetraffic/config.go create mode 100644 scaletest/workspacetraffic/run.go diff --git a/cli/root.go b/cli/root.go index aa61a4eb57..6702754abb 100644 --- a/cli/root.go +++ b/cli/root.go @@ -87,11 +87,13 @@ func (r *RootCmd) Core() []*clibase.Cmd { // Workspace Commands r.configSSH(), - r.rename(), - r.ping(), r.create(), r.deleteWorkspace(), r.list(), + r.parameters(), + r.ping(), + r.rename(), + r.scaletest(), r.schedules(), r.show(), r.speedtest(), @@ -100,13 +102,11 @@ func (r *RootCmd) Core() []*clibase.Cmd { r.stop(), r.update(), r.restart(), - r.parameters(), // Hidden - r.workspaceAgent(), - r.scaletest(), r.gitssh(), r.vscodeSSH(), + r.workspaceAgent(), } } diff --git a/cli/scaletest.go b/cli/scaletest.go index be3eb22ac6..67186da221 100644 --- a/cli/scaletest.go +++ b/cli/scaletest.go @@ -28,6 +28,7 @@ import ( "github.com/coder/coder/scaletest/harness" "github.com/coder/coder/scaletest/reconnectingpty" "github.com/coder/coder/scaletest/workspacebuild" + "github.com/coder/coder/scaletest/workspacetraffic" ) const scaletestTracerName = "coder_scaletest" @@ -42,6 +43,7 @@ func (r *RootCmd) scaletest() *clibase.Cmd { Children: []*clibase.Cmd{ r.scaletestCleanup(), r.scaletestCreateWorkspaces(), + r.scaletestWorkspaceTraffic(), }, } @@ -107,7 +109,10 @@ func (s *scaletestTracingFlags) provider(ctx context.Context) (trace.TracerProvi return tracerProvider, func(ctx context.Context) error { var err error closeTracingOnce.Do(func() { - err = closeTracing(ctx) + // Allow time to upload traces even if ctx is canceled + traceCtx, traceCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer traceCancel() + err = closeTracing(traceCtx) }) return err @@ -384,33 +389,9 @@ func (r *RootCmd) scaletestCleanup() *clibase.Cmd { } cliui.Infof(inv.Stdout, "Fetching scaletest workspaces...") - var ( - pageNumber = 0 - limit = 100 - workspaces []codersdk.Workspace - ) - for { - page, err := client.Workspaces(ctx, codersdk.WorkspaceFilter{ - Name: "scaletest-", - Offset: pageNumber * limit, - Limit: limit, - }) - if err != nil { - return xerrors.Errorf("fetch scaletest workspaces page %d: %w", pageNumber, err) - } - - pageNumber++ - if len(page.Workspaces) == 0 { - break - } - - pageWorkspaces := make([]codersdk.Workspace, 0, len(page.Workspaces)) - for _, w := range page.Workspaces { - if isScaleTestWorkspace(w) { - pageWorkspaces = append(pageWorkspaces, w) - } - } - workspaces = append(workspaces, pageWorkspaces...) + workspaces, err := getScaletestWorkspaces(ctx, client) + if err != nil { + return err } cliui.Errorf(inv.Stderr, "Found %d scaletest workspaces\n", len(workspaces)) @@ -441,33 +422,9 @@ func (r *RootCmd) scaletestCleanup() *clibase.Cmd { } cliui.Infof(inv.Stdout, "Fetching scaletest users...") - pageNumber = 0 - limit = 100 - var users []codersdk.User - for { - page, err := client.Users(ctx, codersdk.UsersRequest{ - Search: "scaletest-", - Pagination: codersdk.Pagination{ - Offset: pageNumber * limit, - Limit: limit, - }, - }) - if err != nil { - return xerrors.Errorf("fetch scaletest users page %d: %w", pageNumber, err) - } - - pageNumber++ - if len(page.Users) == 0 { - break - } - - pageUsers := make([]codersdk.User, 0, len(page.Users)) - for _, u := range page.Users { - if isScaleTestUser(u) { - pageUsers = append(pageUsers, u) - } - } - users = append(users, pageUsers...) + users, err := getScaletestUsers(ctx, client) + if err != nil { + return err } cliui.Errorf(inv.Stderr, "Found %d scaletest users\n", len(users)) @@ -683,10 +640,11 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd { } defer func() { // Allow time for traces to flush even if command context is - // canceled. - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - _ = closeTracing(ctx) + // canceled. This is a no-op if tracing is not enabled. + _, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...") + if err := closeTracing(ctx); err != nil { + _, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err) + } }() tracer := tracerProvider.Tracer(scaletestTracerName) @@ -800,17 +758,6 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd { return xerrors.Errorf("cleanup tests: %w", err) } - // Upload traces. - if tracingEnabled { - _, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...") - ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) - defer cancel() - err := closeTracing(ctx) - if err != nil { - _, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err) - } - } - if res.TotalFail > 0 { return xerrors.New("load test failed, see above for more details") } @@ -947,6 +894,156 @@ func (r *RootCmd) scaletestCreateWorkspaces() *clibase.Cmd { return cmd } +func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd { + var ( + tickInterval time.Duration + bytesPerTick int64 + client = &codersdk.Client{} + tracingFlags = &scaletestTracingFlags{} + strategy = &scaletestStrategyFlags{} + cleanupStrategy = &scaletestStrategyFlags{cleanup: true} + output = &scaletestOutputFlags{} + ) + + cmd := &clibase.Cmd{ + Use: "workspace-traffic", + Short: "Generate traffic to scaletest workspaces through coderd", + Middleware: clibase.Chain( + r.InitClient(client), + ), + Handler: func(inv *clibase.Invocation) error { + ctx := inv.Context() + + // Bypass rate limiting + client.HTTPClient = &http.Client{ + Transport: &headerTransport{ + transport: http.DefaultTransport, + header: map[string][]string{ + codersdk.BypassRatelimitHeader: {"true"}, + }, + }, + } + + workspaces, err := getScaletestWorkspaces(inv.Context(), client) + if err != nil { + return err + } + + if len(workspaces) == 0 { + return xerrors.Errorf("no scaletest workspaces exist") + } + + tracerProvider, closeTracing, tracingEnabled, err := tracingFlags.provider(ctx) + if err != nil { + return xerrors.Errorf("create tracer provider: %w", err) + } + defer func() { + // Allow time for traces to flush even if command context is + // canceled. This is a no-op if tracing is not enabled. + _, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...") + if err := closeTracing(ctx); err != nil { + _, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err) + } + }() + tracer := tracerProvider.Tracer(scaletestTracerName) + + outputs, err := output.parse() + if err != nil { + return xerrors.Errorf("could not parse --output flags") + } + + th := harness.NewTestHarness(strategy.toStrategy(), cleanupStrategy.toStrategy()) + for idx, ws := range workspaces { + var ( + agentID uuid.UUID + name = "workspace-traffic" + id = strconv.Itoa(idx) + ) + + for _, res := range ws.LatestBuild.Resources { + if len(res.Agents) == 0 { + continue + } + agentID = res.Agents[0].ID + } + + if agentID == uuid.Nil { + _, _ = fmt.Fprintf(inv.Stderr, "WARN: skipping workspace %s: no agent\n", ws.Name) + continue + } + + // Setup our workspace agent connection. + config := workspacetraffic.Config{ + AgentID: agentID, + BytesPerTick: bytesPerTick, + Duration: strategy.timeout, + TickInterval: tickInterval, + } + + if err := config.Validate(); err != nil { + return xerrors.Errorf("validate config: %w", err) + } + var runner harness.Runnable = workspacetraffic.NewRunner(client, config) + if tracingEnabled { + runner = &runnableTraceWrapper{ + tracer: tracer, + spanName: fmt.Sprintf("%s/%s", name, id), + runner: runner, + } + } + + th.AddRun(name, id, runner) + } + + _, _ = fmt.Fprintln(inv.Stderr, "Running load test...") + testCtx, testCancel := strategy.toContext(ctx) + defer testCancel() + err = th.Run(testCtx) + if err != nil { + return xerrors.Errorf("run test harness (harness failure, not a test failure): %w", err) + } + + res := th.Results() + for _, o := range outputs { + err = o.write(res, inv.Stdout) + if err != nil { + return xerrors.Errorf("write output %q to %q: %w", o.format, o.path, err) + } + } + + if res.TotalFail > 0 { + return xerrors.New("load test failed, see above for more details") + } + + return nil + }, + } + + cmd.Options = []clibase.Option{ + { + Flag: "bytes-per-tick", + Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_BYTES_PER_TICK", + Default: "1024", + Description: "How much traffic to generate per tick.", + Value: clibase.Int64Of(&bytesPerTick), + }, + { + Flag: "tick-interval", + Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_TICK_INTERVAL", + Default: "100ms", + Description: "How often to send traffic.", + Value: clibase.DurationOf(&tickInterval), + }, + } + + tracingFlags.attach(&cmd.Options) + strategy.attach(&cmd.Options) + cleanupStrategy.attach(&cmd.Options) + output.attach(&cmd.Options) + + return cmd +} + type runnableTraceWrapper struct { tracer trace.Tracer spanName string @@ -1023,3 +1120,72 @@ func isScaleTestWorkspace(workspace codersdk.Workspace) bool { return strings.HasPrefix(workspace.OwnerName, "scaletest-") || strings.HasPrefix(workspace.Name, "scaletest-") } + +func getScaletestWorkspaces(ctx context.Context, client *codersdk.Client) ([]codersdk.Workspace, error) { + var ( + pageNumber = 0 + limit = 100 + workspaces []codersdk.Workspace + ) + + for { + page, err := client.Workspaces(ctx, codersdk.WorkspaceFilter{ + Name: "scaletest-", + Offset: pageNumber * limit, + Limit: limit, + }) + if err != nil { + return nil, xerrors.Errorf("fetch scaletest workspaces page %d: %w", pageNumber, err) + } + + pageNumber++ + if len(page.Workspaces) == 0 { + break + } + + pageWorkspaces := make([]codersdk.Workspace, 0, len(page.Workspaces)) + for _, w := range page.Workspaces { + if isScaleTestWorkspace(w) { + pageWorkspaces = append(pageWorkspaces, w) + } + } + workspaces = append(workspaces, pageWorkspaces...) + } + return workspaces, nil +} + +func getScaletestUsers(ctx context.Context, client *codersdk.Client) ([]codersdk.User, error) { + var ( + pageNumber = 0 + limit = 100 + users []codersdk.User + ) + + for { + page, err := client.Users(ctx, codersdk.UsersRequest{ + Search: "scaletest-", + Pagination: codersdk.Pagination{ + Offset: pageNumber * limit, + Limit: limit, + }, + }) + if err != nil { + return nil, xerrors.Errorf("fetch scaletest users page %d: %w", pageNumber, err) + } + + pageNumber++ + if len(page.Users) == 0 { + break + } + + pageUsers := make([]codersdk.User, 0, len(page.Users)) + for _, u := range page.Users { + if isScaleTestUser(u) { + pageUsers = append(pageUsers, u) + } + } + users = append(users, pageUsers...) + } + + return users, nil +} diff --git a/cli/scaletest_test.go b/cli/scaletest_test.go index 3636b8ef40..b026e7636b 100644 --- a/cli/scaletest_test.go +++ b/cli/scaletest_test.go @@ -1,24 +1,30 @@ package cli_test import ( + "bytes" "context" "encoding/json" "os" "path/filepath" "testing" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/coder/coder/agent" "github.com/coder/coder/cli/clitest" "github.com/coder/coder/coderd/coderdtest" "github.com/coder/coder/codersdk" + "github.com/coder/coder/codersdk/agentsdk" + "github.com/coder/coder/provisioner/echo" + "github.com/coder/coder/provisionersdk/proto" "github.com/coder/coder/pty/ptytest" "github.com/coder/coder/scaletest/harness" "github.com/coder/coder/testutil" ) -func TestScaleTest(t *testing.T) { +func TestScaleTestCreateWorkspaces(t *testing.T) { t.Skipf("This test is flakey. See https://github.com/coder/coder/issues/4942") t.Parallel() @@ -198,3 +204,71 @@ param3: 1 require.Len(t, users.Users, 1) }) } + +// This test pretends to stand up a workspace and run a no-op traffic generation test. +// It's not a real test, but it's useful for debugging. +// We do not perform any cleanup. +func TestScaleTestWorkspaceTraffic(t *testing.T) { + t.Parallel() + + ctx, cancelFunc := context.WithTimeout(context.Background(), testutil.WaitMedium) + defer cancelFunc() + + client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerDaemon: true}) + user := coderdtest.CreateFirstUser(t, client) + + authToken := uuid.NewString() + version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{ + Parse: echo.ParseComplete, + ProvisionPlan: echo.ProvisionComplete, + ProvisionApply: []*proto.Provision_Response{{ + Type: &proto.Provision_Response_Complete{ + Complete: &proto.Provision_Complete{ + Resources: []*proto.Resource{{ + Name: "example", + Type: "aws_instance", + Agents: []*proto.Agent{{ + Id: uuid.NewString(), + Name: "agent", + Auth: &proto.Agent_Token{ + Token: authToken, + }, + Apps: []*proto.App{}, + }}, + }}, + }, + }, + }}, + }) + template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID) + coderdtest.AwaitTemplateVersionJob(t, client, version.ID) + + ws := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID, func(cwr *codersdk.CreateWorkspaceRequest) { + cwr.Name = "scaletest-test" + }) + coderdtest.AwaitWorkspaceBuildJob(t, client, ws.LatestBuild.ID) + + agentClient := agentsdk.New(client.URL) + agentClient.SetSessionToken(authToken) + agentCloser := agent.New(agent.Options{ + Client: agentClient, + }) + t.Cleanup(func() { + _ = agentCloser.Close() + }) + + coderdtest.AwaitWorkspaceAgents(t, client, ws.ID) + + inv, root := clitest.New(t, "scaletest", "workspace-traffic", + "--timeout", "1s", + "--bytes-per-tick", "1024", + "--tick-interval", "100ms", + ) + clitest.SetupConfig(t, client, root) + var stdout, stderr bytes.Buffer + inv.Stdout = &stdout + inv.Stderr = &stderr + err := inv.WithContext(ctx).Run() + require.NoError(t, err) + require.Contains(t, stdout.String(), "Pass: 1") +} diff --git a/cli/testdata/coder_scaletest_--help.golden b/cli/testdata/coder_scaletest_--help.golden index 37c7d4d10d..6ab343cd33 100644 --- a/cli/testdata/coder_scaletest_--help.golden +++ b/cli/testdata/coder_scaletest_--help.golden @@ -10,6 +10,7 @@ Run a scale test against the Coder API online. Optionally runs a command inside each workspace, and connects to the workspace over WireGuard. + workspace-traffic Generate traffic to scaletest workspaces through coderd --- Run `coder --help` for a list of global options. diff --git a/cli/testdata/coder_scaletest_workspace-traffic_--help.golden b/cli/testdata/coder_scaletest_workspace-traffic_--help.golden new file mode 100644 index 0000000000..b7de6ca960 --- /dev/null +++ b/cli/testdata/coder_scaletest_workspace-traffic_--help.golden @@ -0,0 +1,55 @@ +Usage: coder scaletest workspace-traffic [flags] + +Generate traffic to scaletest workspaces through coderd + +Options + --bytes-per-tick int, $CODER_SCALETEST_WORKSPACE_TRAFFIC_BYTES_PER_TICK (default: 1024) + How much traffic to generate per tick. + + --cleanup-concurrency int, $CODER_SCALETEST_CLEANUP_CONCURRENCY (default: 1) + Number of concurrent cleanup jobs to run. 0 means unlimited. + + --cleanup-job-timeout duration, $CODER_SCALETEST_CLEANUP_JOB_TIMEOUT (default: 5m) + Timeout per job. Jobs may take longer to complete under higher + concurrency limits. + + --cleanup-timeout duration, $CODER_SCALETEST_CLEANUP_TIMEOUT (default: 30m) + Timeout for the entire cleanup run. 0 means unlimited. + + --concurrency int, $CODER_SCALETEST_CONCURRENCY (default: 1) + Number of concurrent jobs to run. 0 means unlimited. + + --job-timeout duration, $CODER_SCALETEST_JOB_TIMEOUT (default: 5m) + Timeout per job. Jobs may take longer to complete under higher + concurrency limits. + + --output string-array, $CODER_SCALETEST_OUTPUTS (default: text) + Output format specs in the format "[:]". Not specifying + a path will default to stdout. Available formats: text, json. + + --tick-interval duration, $CODER_SCALETEST_WORKSPACE_TRAFFIC_TICK_INTERVAL (default: 100ms) + How often to send traffic. + + --timeout duration, $CODER_SCALETEST_TIMEOUT (default: 30m) + Timeout for the entire test run. 0 means unlimited. + + --trace bool, $CODER_SCALETEST_TRACE + Whether application tracing data is collected. It exports to a backend + configured by environment variables. See: + https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md. + + --trace-coder bool, $CODER_SCALETEST_TRACE_CODER + Whether opentelemetry traces are sent to Coder. We recommend keeping + this disabled unless we advise you to enable it. + + --trace-honeycomb-api-key string, $CODER_SCALETEST_TRACE_HONEYCOMB_API_KEY + Enables trace exporting to Honeycomb.io using the provided API key. + + --trace-propagate bool, $CODER_SCALETEST_TRACE_PROPAGATE + Enables trace propagation to the Coder backend, which will be used to + correlate server-side spans with client-side spans. Only enable this + if the server is configured with the exact same tracing configuration + as the client. + +--- +Run `coder --help` for a list of global options. diff --git a/codersdk/workspaceagentconn.go b/codersdk/workspaceagentconn.go index 0095ac0e13..64bd4fe2f8 100644 --- a/codersdk/workspaceagentconn.go +++ b/codersdk/workspaceagentconn.go @@ -165,9 +165,9 @@ type WorkspaceAgentReconnectingPTYInit struct { // to pipe data to a PTY. // @typescript-ignore ReconnectingPTYRequest type ReconnectingPTYRequest struct { - Data string `json:"data"` - Height uint16 `json:"height"` - Width uint16 `json:"width"` + Data string `json:"data,omitempty"` + Height uint16 `json:"height,omitempty"` + Width uint16 `json:"width,omitempty"` } // ReconnectingPTY spawns a new reconnecting terminal session. diff --git a/docs/cli/scaletest.md b/docs/cli/scaletest.md index aae17ff7ba..dd9eb4c646 100644 --- a/docs/cli/scaletest.md +++ b/docs/cli/scaletest.md @@ -16,3 +16,4 @@ coder scaletest | ------------------------------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | [cleanup](./scaletest_cleanup.md) | Cleanup scaletest workspaces, then cleanup scaletest users. | | [create-workspaces](./scaletest_create-workspaces.md) | Creates many users, then creates a workspace for each user and waits for them finish building and fully come online. Optionally runs a command inside each workspace, and connects to the workspace over WireGuard. | +| [workspace-traffic](./scaletest_workspace-traffic.md) | Generate traffic to scaletest workspaces through coderd | diff --git a/docs/cli/scaletest_workspace-traffic.md b/docs/cli/scaletest_workspace-traffic.md new file mode 100644 index 0000000000..5303847345 --- /dev/null +++ b/docs/cli/scaletest_workspace-traffic.md @@ -0,0 +1,139 @@ + + +# scaletest workspace-traffic + +Generate traffic to scaletest workspaces through coderd + +## Usage + +```console +coder scaletest workspace-traffic [flags] +``` + +## Options + +### --bytes-per-tick + +| | | +| ----------- | -------------------------------------------------------------- | +| Type | int | +| Environment | $CODER_SCALETEST_WORKSPACE_TRAFFIC_BYTES_PER_TICK | +| Default | 1024 | + +How much traffic to generate per tick. + +### --cleanup-concurrency + +| | | +| ----------- | ------------------------------------------------- | +| Type | int | +| Environment | $CODER_SCALETEST_CLEANUP_CONCURRENCY | +| Default | 1 | + +Number of concurrent cleanup jobs to run. 0 means unlimited. + +### --cleanup-job-timeout + +| | | +| ----------- | ------------------------------------------------- | +| Type | duration | +| Environment | $CODER_SCALETEST_CLEANUP_JOB_TIMEOUT | +| Default | 5m | + +Timeout per job. Jobs may take longer to complete under higher concurrency limits. + +### --cleanup-timeout + +| | | +| ----------- | --------------------------------------------- | +| Type | duration | +| Environment | $CODER_SCALETEST_CLEANUP_TIMEOUT | +| Default | 30m | + +Timeout for the entire cleanup run. 0 means unlimited. + +### --concurrency + +| | | +| ----------- | ----------------------------------------- | +| Type | int | +| Environment | $CODER_SCALETEST_CONCURRENCY | +| Default | 1 | + +Number of concurrent jobs to run. 0 means unlimited. + +### --job-timeout + +| | | +| ----------- | ----------------------------------------- | +| Type | duration | +| Environment | $CODER_SCALETEST_JOB_TIMEOUT | +| Default | 5m | + +Timeout per job. Jobs may take longer to complete under higher concurrency limits. + +### --output + +| | | +| ----------- | ------------------------------------- | +| Type | string-array | +| Environment | $CODER_SCALETEST_OUTPUTS | +| Default | text | + +Output format specs in the format "[:]". Not specifying a path will default to stdout. Available formats: text, json. + +### --tick-interval + +| | | +| ----------- | ------------------------------------------------------------- | +| Type | duration | +| Environment | $CODER_SCALETEST_WORKSPACE_TRAFFIC_TICK_INTERVAL | +| Default | 100ms | + +How often to send traffic. + +### --timeout + +| | | +| ----------- | ------------------------------------- | +| Type | duration | +| Environment | $CODER_SCALETEST_TIMEOUT | +| Default | 30m | + +Timeout for the entire test run. 0 means unlimited. + +### --trace + +| | | +| ----------- | ----------------------------------- | +| Type | bool | +| Environment | $CODER_SCALETEST_TRACE | + +Whether application tracing data is collected. It exports to a backend configured by environment variables. See: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md. + +### --trace-coder + +| | | +| ----------- | ----------------------------------------- | +| Type | bool | +| Environment | $CODER_SCALETEST_TRACE_CODER | + +Whether opentelemetry traces are sent to Coder. We recommend keeping this disabled unless we advise you to enable it. + +### --trace-honeycomb-api-key + +| | | +| ----------- | ----------------------------------------------------- | +| Type | string | +| Environment | $CODER_SCALETEST_TRACE_HONEYCOMB_API_KEY | + +Enables trace exporting to Honeycomb.io using the provided API key. + +### --trace-propagate + +| | | +| ----------- | --------------------------------------------- | +| Type | bool | +| Environment | $CODER_SCALETEST_TRACE_PROPAGATE | + +Enables trace propagation to the Coder backend, which will be used to correlate server-side spans with client-side spans. Only enable this if the server is configured with the exact same tracing configuration as the client. diff --git a/docs/manifest.json b/docs/manifest.json index 367b5dbe2c..9ee2f2a28e 100644 --- a/docs/manifest.json +++ b/docs/manifest.json @@ -637,6 +637,11 @@ "description": "Creates many users, then creates a workspace for each user and waits for them finish building and fully come online. Optionally runs a command inside each workspace, and connects to the workspace over WireGuard.", "path": "cli/scaletest_create-workspaces.md" }, + { + "title": "scaletest workspace-traffic", + "description": "Generate traffic to scaletest workspaces through coderd", + "path": "cli/scaletest_workspace-traffic.md" + }, { "title": "schedule", "description": "Schedule automated start and stop times for workspaces", diff --git a/scaletest/workspacetraffic/config.go b/scaletest/workspacetraffic/config.go new file mode 100644 index 0000000000..abf7b7b771 --- /dev/null +++ b/scaletest/workspacetraffic/config.go @@ -0,0 +1,43 @@ +package workspacetraffic + +import ( + "time" + + "github.com/google/uuid" + "golang.org/x/xerrors" +) + +type Config struct { + // AgentID is the workspace agent ID to which to connect. + AgentID uuid.UUID `json:"agent_id"` + + // BytesPerTick is the number of bytes to send to the agent per tick. + BytesPerTick int64 `json:"bytes_per_tick"` + + // Duration is the total duration for which to send traffic to the agent. + Duration time.Duration `json:"duration"` + + // TickInterval specifies the interval between ticks (that is, attempts to + // send data to workspace agents). + TickInterval time.Duration `json:"tick_interval"` +} + +func (c Config) Validate() error { + if c.AgentID == uuid.Nil { + return xerrors.Errorf("validate agent_id: must not be nil") + } + + if c.BytesPerTick <= 0 { + return xerrors.Errorf("validate bytes_per_tick: must be greater than zero") + } + + if c.Duration <= 0 { + return xerrors.Errorf("validate duration: must be greater than zero") + } + + if c.TickInterval <= 0 { + return xerrors.Errorf("validate tick_interval: must be greater than zero") + } + + return nil +} diff --git a/scaletest/workspacetraffic/run.go b/scaletest/workspacetraffic/run.go new file mode 100644 index 0000000000..863e26e958 --- /dev/null +++ b/scaletest/workspacetraffic/run.go @@ -0,0 +1,224 @@ +package workspacetraffic + +import ( + "context" + "encoding/json" + "io" + "sync/atomic" + "time" + + "github.com/google/uuid" + "golang.org/x/xerrors" + "nhooyr.io/websocket" + + "cdr.dev/slog" + "cdr.dev/slog/sloggers/sloghuman" + + "github.com/coder/coder/coderd/tracing" + "github.com/coder/coder/codersdk" + "github.com/coder/coder/cryptorand" + "github.com/coder/coder/scaletest/harness" + "github.com/coder/coder/scaletest/loadtestutil" +) + +type Runner struct { + client *codersdk.Client + cfg Config +} + +var ( + _ harness.Runnable = &Runner{} + _ harness.Cleanable = &Runner{} +) + +func NewRunner(client *codersdk.Client, cfg Config) *Runner { + return &Runner{ + client: client, + cfg: cfg, + } +} + +func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { + ctx, span := tracing.StartSpan(ctx) + defer span.End() + + logs = loadtestutil.NewSyncWriter(logs) + logger := slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug) + r.client.Logger = logger + r.client.LogBodies = true + + var ( + agentID = r.cfg.AgentID + reconnect = uuid.New() + height uint16 = 25 + width uint16 = 80 + tickInterval = r.cfg.TickInterval + bytesPerTick = r.cfg.BytesPerTick + ) + + logger.Info(ctx, "config", + slog.F("agent_id", agentID), + slog.F("reconnect", reconnect), + slog.F("height", height), + slog.F("width", width), + slog.F("tick_interval", tickInterval), + slog.F("bytes_per_tick", bytesPerTick), + ) + + // Set a deadline for stopping the text. + start := time.Now() + deadlineCtx, cancel := context.WithDeadline(ctx, start.Add(r.cfg.Duration)) + defer cancel() + logger.Debug(ctx, "connect to workspace agent", slog.F("agent_id", agentID)) + + conn, err := r.client.WorkspaceAgentReconnectingPTY(ctx, codersdk.WorkspaceAgentReconnectingPTYOpts{ + AgentID: agentID, + Reconnect: reconnect, + Height: height, + Width: width, + Command: "/bin/sh", + }) + if err != nil { + logger.Error(ctx, "connect to workspace agent", slog.F("agent_id", agentID), slog.Error(err)) + return xerrors.Errorf("connect to workspace: %w", err) + } + + go func() { + <-deadlineCtx.Done() + logger.Debug(ctx, "close agent connection", slog.F("agent_id", agentID)) + _ = conn.Close() + }() + + // Wrap the conn in a countReadWriter so we can monitor bytes sent/rcvd. + crw := countReadWriter{ReadWriter: conn} + + // Create a ticker for sending data to the PTY. + tick := time.NewTicker(tickInterval) + defer tick.Stop() + + // Now we begin writing random data to the pty. + rch := make(chan error, 1) + wch := make(chan error, 1) + + go func() { + <-deadlineCtx.Done() + logger.Debug(ctx, "closing agent connection") + conn.Close() + }() + + // Read forever in the background. + go func() { + logger.Debug(ctx, "reading from agent", slog.F("agent_id", agentID)) + rch <- drain(&crw) + logger.Debug(ctx, "done reading from agent", slog.F("agent_id", agentID)) + close(rch) + }() + + // Write random data to the PTY every tick. + go func() { + logger.Debug(ctx, "writing to agent", slog.F("agent_id", agentID)) + wch <- writeRandomData(&crw, bytesPerTick, tick.C) + logger.Debug(ctx, "done writing to agent", slog.F("agent_id", agentID)) + close(wch) + }() + + // Write until the context is canceled. + if wErr := <-wch; wErr != nil { + return xerrors.Errorf("write to pty: %w", wErr) + } + if rErr := <-rch; rErr != nil { + return xerrors.Errorf("read from pty: %w", rErr) + } + + duration := time.Since(start) + + logger.Info(ctx, "results", + slog.F("duration", duration), + slog.F("sent", crw.BytesWritten()), + slog.F("rcvd", crw.BytesRead()), + ) + + return nil +} + +// Cleanup does nothing, successfully. +func (*Runner) Cleanup(context.Context, string) error { + return nil +} + +// drain drains from src until it returns io.EOF or ctx times out. +func drain(src io.Reader) error { + if _, err := io.Copy(io.Discard, src); err != nil { + if xerrors.Is(err, context.DeadlineExceeded) { + return nil + } + if xerrors.As(err, &websocket.CloseError{}) { + return nil + } + return err + } + return nil +} + +func writeRandomData(dst io.Writer, size int64, tick <-chan time.Time) error { + var ( + enc = json.NewEncoder(dst) + ptyReq = codersdk.ReconnectingPTYRequest{} + ) + for range tick { + payload := "#" + mustRandStr(size-1) + ptyReq.Data = payload + if err := enc.Encode(ptyReq); err != nil { + if xerrors.Is(err, context.DeadlineExceeded) { + return nil + } + if xerrors.As(err, &websocket.CloseError{}) { + return nil + } + return err + } + } + return nil +} + +// countReadWriter wraps an io.ReadWriter and counts the number of bytes read and written. +type countReadWriter struct { + io.ReadWriter + bytesRead atomic.Int64 + bytesWritten atomic.Int64 +} + +func (w *countReadWriter) Read(p []byte) (int, error) { + n, err := w.ReadWriter.Read(p) + if err == nil { + w.bytesRead.Add(int64(n)) + } + return n, err +} + +func (w *countReadWriter) Write(p []byte) (int, error) { + n, err := w.ReadWriter.Write(p) + if err == nil { + w.bytesWritten.Add(int64(n)) + } + return n, err +} + +func (w *countReadWriter) BytesRead() int64 { + return w.bytesRead.Load() +} + +func (w *countReadWriter) BytesWritten() int64 { + return w.bytesWritten.Load() +} + +func mustRandStr(l int64) string { + if l < 1 { + l = 1 + } + randStr, err := cryptorand.String(int(l)) + if err != nil { + panic(err) + } + return randStr +}