chore: move drpc transport tools to codersdk/drpc (#11224)

Part of #10532

DRPC transport over yamux and in-mem pipes was previously only used on the provisioner APIs, but now will also be used in tailnet.  Moved to subpackage of codersdk to avoid import loops.
This commit is contained in:
Spike Curtis 2023-12-15 12:41:39 +04:00 committed by GitHub
parent b36071c6bb
commit 9a4e1100fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 28 additions and 18 deletions

View File

@ -90,6 +90,7 @@ import (
stringutil "github.com/coder/coder/v2/coderd/util/strings"
"github.com/coder/coder/v2/coderd/workspaceapps"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/cryptorand"
"github.com/coder/coder/v2/provisioner/echo"
"github.com/coder/coder/v2/provisioner/terraform"
@ -1298,7 +1299,7 @@ func newProvisionerDaemon(
connector := provisionerd.LocalProvisioners{}
if cfg.Provisioner.DaemonsEcho {
echoClient, echoServer := provisionersdk.MemTransportPipe()
echoClient, echoServer := drpc.MemTransportPipe()
wg.Add(1)
go func() {
defer wg.Done()
@ -1332,7 +1333,7 @@ func newProvisionerDaemon(
}
tracer := coderAPI.TracerProvider.Tracer(tracing.TracerName)
terraformClient, terraformServer := provisionersdk.MemTransportPipe()
terraformClient, terraformServer := drpc.MemTransportPipe()
wg.Add(1)
go func() {
defer wg.Done()

View File

@ -66,6 +66,7 @@ import (
"github.com/coder/coder/v2/coderd/wsconncache"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/provisionerd/proto"
"github.com/coder/coder/v2/provisionersdk"
"github.com/coder/coder/v2/site"
@ -1159,7 +1160,7 @@ func compressHandler(h http.Handler) http.Handler {
// Useful when starting coderd and provisionerd in the same process.
func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, name string) (client proto.DRPCProvisionerDaemonClient, err error) {
tracer := api.TracerProvider.Tracer(tracing.TracerName)
clientSession, serverSession := provisionersdk.MemTransportPipe()
clientSession, serverSession := drpc.MemTransportPipe()
defer func() {
if err != nil {
_ = clientSession.Close()

View File

@ -73,6 +73,7 @@ import (
"github.com/coder/coder/v2/coderd/workspaceapps"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/cryptorand"
"github.com/coder/coder/v2/provisioner/echo"
"github.com/coder/coder/v2/provisionerd"
@ -512,7 +513,7 @@ func NewProvisionerDaemon(t testing.TB, coderAPI *coderd.API) io.Closer {
// seems t.TempDir() is not safe to call from a different goroutine
workDir := t.TempDir()
echoClient, echoServer := provisionersdk.MemTransportPipe()
echoClient, echoServer := drpc.MemTransportPipe()
ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(func() {
_ = echoClient.Close()
@ -547,7 +548,7 @@ func NewProvisionerDaemon(t testing.TB, coderAPI *coderd.API) io.Closer {
}
func NewExternalProvisionerDaemon(t testing.TB, client *codersdk.Client, org uuid.UUID, tags map[string]string) io.Closer {
echoClient, echoServer := provisionersdk.MemTransportPipe()
echoClient, echoServer := drpc.MemTransportPipe()
ctx, cancelFunc := context.WithCancel(context.Background())
serveDone := make(chan struct{})
t.Cleanup(func() {

View File

@ -37,6 +37,7 @@ import (
"github.com/coder/coder/v2/coderd/telemetry"
"github.com/coder/coder/v2/coderd/tracing"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/provisioner"
"github.com/coder/coder/v2/provisionerd/proto"
"github.com/coder/coder/v2/provisionersdk"
@ -542,8 +543,8 @@ func (s *server) acquireProtoJob(ctx context.Context, job database.ProvisionerJo
default:
return nil, failJob(fmt.Sprintf("unsupported storage method: %s", job.StorageMethod))
}
if protobuf.Size(protoJob) > provisionersdk.MaxMessageSize {
return nil, failJob(fmt.Sprintf("payload was too big: %d > %d", protobuf.Size(protoJob), provisionersdk.MaxMessageSize))
if protobuf.Size(protoJob) > drpc.MaxMessageSize {
return nil, failJob(fmt.Sprintf("payload was too big: %d > %d", protobuf.Size(protoJob), drpc.MaxMessageSize))
}
return protoJob, err

View File

@ -1,4 +1,4 @@
package provisionersdk
package drpc
import (
"context"

View File

@ -15,9 +15,9 @@ import (
"golang.org/x/xerrors"
"nhooyr.io/websocket"
"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/provisionerd/proto"
"github.com/coder/coder/v2/provisionerd/runner"
"github.com/coder/coder/v2/provisionersdk"
)
type LogSource string
@ -252,7 +252,7 @@ func (c *Client) ServeProvisionerDaemon(ctx context.Context, req ServeProvisione
_ = wsNetConn.Close()
return nil, xerrors.Errorf("multiplex client: %w", err)
}
return proto.NewDRPCProvisionerDaemonClient(provisionersdk.MultiplexedConn(session)), nil
return proto.NewDRPCProvisionerDaemonClient(drpc.MultiplexedConn(session)), nil
}
// wsNetConn wraps net.Conn created by websocket.NetConn(). Cancel func

View File

@ -20,6 +20,7 @@ import (
"github.com/coder/coder/v2/cli/cliutil"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/provisioner/terraform"
"github.com/coder/coder/v2/provisionerd"
provisionerdproto "github.com/coder/coder/v2/provisionerd/proto"
@ -115,7 +116,7 @@ func (r *RootCmd) provisionerDaemonStart() *clibase.Cmd {
return err
}
terraformClient, terraformServer := provisionersdk.MemTransportPipe()
terraformClient, terraformServer := drpc.MemTransportPipe()
go func() {
<-ctx.Done()
_ = terraformClient.Close()

View File

@ -16,6 +16,7 @@ import (
"github.com/coder/coder/v2/coderd/rbac"
"github.com/coder/coder/v2/coderd/util/ptr"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/enterprise/coderd/coderdenttest"
"github.com/coder/coder/v2/enterprise/coderd/license"
"github.com/coder/coder/v2/provisioner/echo"
@ -228,7 +229,7 @@ func TestProvisionerDaemonServe(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
terraformClient, terraformServer := provisionersdk.MemTransportPipe()
terraformClient, terraformServer := drpc.MemTransportPipe()
go func() {
<-ctx.Done()
_ = terraformClient.Close()

View File

@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/provisioner/echo"
"github.com/coder/coder/v2/provisionersdk"
"github.com/coder/coder/v2/provisionersdk/proto"
@ -19,7 +20,7 @@ func TestEcho(t *testing.T) {
workdir := t.TempDir()
// Create an in-memory provisioner to communicate with.
client, server := provisionersdk.MemTransportPipe()
client, server := drpc.MemTransportPipe()
ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(func() {
_ = client.Close()

View File

@ -22,6 +22,7 @@ import (
"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/provisioner/terraform"
"github.com/coder/coder/v2/provisionersdk"
"github.com/coder/coder/v2/provisionersdk/proto"
@ -38,7 +39,7 @@ func setupProvisioner(t *testing.T, opts *provisionerServeOptions) (context.Cont
}
cachePath := t.TempDir()
workDir := t.TempDir()
client, server := provisionersdk.MemTransportPipe()
client, server := drpc.MemTransportPipe()
ctx, cancelFunc := context.WithCancel(context.Background())
serverErr := make(chan error, 1)
t.Cleanup(func() {

View File

@ -23,6 +23,7 @@ import (
"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/provisionerd"
"github.com/coder/coder/v2/provisionerd/proto"
"github.com/coder/coder/v2/provisionersdk"
@ -1093,7 +1094,7 @@ func createProvisionerDaemonClient(t *testing.T, done <-chan struct{}, server pr
return &proto.Empty{}, nil
}
}
clientPipe, serverPipe := provisionersdk.MemTransportPipe()
clientPipe, serverPipe := drpc.MemTransportPipe()
t.Cleanup(func() {
_ = clientPipe.Close()
_ = serverPipe.Close()
@ -1129,7 +1130,7 @@ func createProvisionerDaemonClient(t *testing.T, done <-chan struct{}, server pr
// to the server implementation provided.
func createProvisionerClient(t *testing.T, done <-chan struct{}, server provisionerTestServer) sdkproto.DRPCProvisionerClient {
t.Helper()
clientPipe, serverPipe := provisionersdk.MemTransportPipe()
clientPipe, serverPipe := drpc.MemTransportPipe()
t.Cleanup(func() {
_ = clientPipe.Close()
_ = serverPipe.Close()

View File

@ -10,6 +10,7 @@ import (
"go.uber.org/goleak"
"storj.io/drpc/drpcconn"
"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/provisionersdk"
"github.com/coder/coder/v2/provisionersdk/proto"
"github.com/coder/coder/v2/testutil"
@ -23,7 +24,7 @@ func TestProvisionerSDK(t *testing.T) {
t.Parallel()
t.Run("ServeListener", func(t *testing.T) {
t.Parallel()
client, server := provisionersdk.MemTransportPipe()
client, server := drpc.MemTransportPipe()
defer client.Close()
defer server.Close()
@ -65,7 +66,7 @@ func TestProvisionerSDK(t *testing.T) {
t.Run("ServeClosedPipe", func(t *testing.T) {
t.Parallel()
client, server := provisionersdk.MemTransportPipe()
client, server := drpc.MemTransportPipe()
_ = client.Close()
_ = server.Close()