mirror of https://github.com/coder/coder.git
fix: Use in-memory filesystem for echo provisioner tests (#2408)
* fix: Use in-memory filesystem for echo provisioner tests
This should reduce IO in CI to shave some time off tests!
* test: Increase timeouts to reduce flakes
It's difficult to understand what's timing out due to a lock
vs. taking a long time. This should help resolve! 🕵️
This commit is contained in:
parent
5e673cc544
commit
024ab6df57
|
@ -32,6 +32,7 @@ import (
|
|||
"github.com/pion/turn/v2"
|
||||
"github.com/pion/webrtc/v3"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/spf13/afero"
|
||||
"github.com/spf13/cobra"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
"golang.org/x/oauth2"
|
||||
|
@ -550,7 +551,7 @@ func newProvisionerDaemon(ctx context.Context, coderAPI *coderd.API,
|
|||
if dev {
|
||||
echoClient, echoServer := provisionersdk.TransportPipe()
|
||||
go func() {
|
||||
err := echo.Serve(ctx, &provisionersdk.ServeOptions{Listener: echoServer})
|
||||
err := echo.Serve(ctx, afero.NewOsFs(), &provisionersdk.ServeOptions{Listener: echoServer})
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
}
|
||||
|
|
|
@ -25,6 +25,8 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/afero"
|
||||
|
||||
"github.com/coder/coder/coderd/rbac"
|
||||
"github.com/coder/coder/coderd/util/ptr"
|
||||
|
||||
|
@ -190,18 +192,20 @@ func NewProvisionerDaemon(t *testing.T, coderAPI *coderd.API) io.Closer {
|
|||
_ = echoServer.Close()
|
||||
cancelFunc()
|
||||
})
|
||||
fs := afero.NewMemMapFs()
|
||||
go func() {
|
||||
err := echo.Serve(ctx, &provisionersdk.ServeOptions{
|
||||
err := echo.Serve(ctx, fs, &provisionersdk.ServeOptions{
|
||||
Listener: echoServer,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
|
||||
closer := provisionerd.New(coderAPI.ListenProvisionerDaemon, &provisionerd.Options{
|
||||
Filesystem: fs,
|
||||
Logger: slogtest.Make(t, nil).Named("provisionerd").Leveled(slog.LevelDebug),
|
||||
PollInterval: 50 * time.Millisecond,
|
||||
UpdateInterval: 250 * time.Millisecond,
|
||||
ForceCancelInterval: 250 * time.Millisecond,
|
||||
PollInterval: 10 * time.Millisecond,
|
||||
UpdateInterval: 25 * time.Millisecond,
|
||||
ForceCancelInterval: 25 * time.Millisecond,
|
||||
Provisioners: provisionerd.Provisioners{
|
||||
string(database.ProvisionerTypeEcho): proto.NewDRPCProvisionerClient(provisionersdk.Conn(echoClient)),
|
||||
},
|
||||
|
|
2
go.mod
2
go.mod
|
@ -227,7 +227,7 @@ require (
|
|||
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect
|
||||
github.com/rivo/uniseg v0.2.0 // indirect
|
||||
github.com/sirupsen/logrus v1.8.1 // indirect
|
||||
github.com/spf13/afero v1.8.2 // indirect
|
||||
github.com/spf13/afero v1.8.2
|
||||
github.com/spf13/cast v1.5.0 // indirect
|
||||
github.com/spf13/jwalterweatherman v1.1.0 // indirect
|
||||
github.com/tadvi/systray v0.0.0-20190226123456-11a2b8fa57af // indirect
|
||||
|
|
|
@ -5,12 +5,13 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
protobuf "google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/spf13/afero"
|
||||
|
||||
"github.com/coder/coder/provisionersdk"
|
||||
"github.com/coder/coder/provisionersdk/proto"
|
||||
)
|
||||
|
@ -31,20 +32,24 @@ var (
|
|||
)
|
||||
|
||||
// Serve starts the echo provisioner.
|
||||
func Serve(ctx context.Context, options *provisionersdk.ServeOptions) error {
|
||||
return provisionersdk.Serve(ctx, &echo{}, options)
|
||||
func Serve(ctx context.Context, filesystem afero.Fs, options *provisionersdk.ServeOptions) error {
|
||||
return provisionersdk.Serve(ctx, &echo{
|
||||
filesystem: filesystem,
|
||||
}, options)
|
||||
}
|
||||
|
||||
// The echo provisioner serves as a dummy provisioner primarily
|
||||
// used for testing. It echos responses from JSON files in the
|
||||
// format %d.protobuf. It's used for testing.
|
||||
type echo struct{}
|
||||
type echo struct {
|
||||
filesystem afero.Fs
|
||||
}
|
||||
|
||||
// Parse reads requests from the provided directory to stream responses.
|
||||
func (*echo) Parse(request *proto.Parse_Request, stream proto.DRPCProvisioner_ParseStream) error {
|
||||
func (e *echo) Parse(request *proto.Parse_Request, stream proto.DRPCProvisioner_ParseStream) error {
|
||||
for index := 0; ; index++ {
|
||||
path := filepath.Join(request.Directory, fmt.Sprintf("%d.parse.protobuf", index))
|
||||
_, err := os.Stat(path)
|
||||
_, err := e.filesystem.Stat(path)
|
||||
if err != nil {
|
||||
if index == 0 {
|
||||
// Error if nothing is around to enable failed states.
|
||||
|
@ -52,7 +57,7 @@ func (*echo) Parse(request *proto.Parse_Request, stream proto.DRPCProvisioner_Pa
|
|||
}
|
||||
break
|
||||
}
|
||||
data, err := os.ReadFile(path)
|
||||
data, err := afero.ReadFile(e.filesystem, path)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("read file %q: %w", path, err)
|
||||
}
|
||||
|
@ -71,7 +76,7 @@ func (*echo) Parse(request *proto.Parse_Request, stream proto.DRPCProvisioner_Pa
|
|||
}
|
||||
|
||||
// Provision reads requests from the provided directory to stream responses.
|
||||
func (*echo) Provision(stream proto.DRPCProvisioner_ProvisionStream) error {
|
||||
func (e *echo) Provision(stream proto.DRPCProvisioner_ProvisionStream) error {
|
||||
msg, err := stream.Recv()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -83,7 +88,7 @@ func (*echo) Provision(stream proto.DRPCProvisioner_ProvisionStream) error {
|
|||
extension = ".dry.protobuf"
|
||||
}
|
||||
path := filepath.Join(request.Directory, fmt.Sprintf("%d.provision"+extension, index))
|
||||
_, err := os.Stat(path)
|
||||
_, err := e.filesystem.Stat(path)
|
||||
if err != nil {
|
||||
if index == 0 {
|
||||
// Error if nothing is around to enable failed states.
|
||||
|
@ -91,7 +96,7 @@ func (*echo) Provision(stream proto.DRPCProvisioner_ProvisionStream) error {
|
|||
}
|
||||
break
|
||||
}
|
||||
data, err := os.ReadFile(path)
|
||||
data, err := afero.ReadFile(e.filesystem, path)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("read file %q: %w", path, err)
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/spf13/afero"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
|
@ -20,6 +21,7 @@ import (
|
|||
func TestEcho(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
fs := afero.NewMemMapFs()
|
||||
// Create an in-memory provisioner to communicate with.
|
||||
client, server := provisionersdk.TransportPipe()
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
|
@ -29,7 +31,7 @@ func TestEcho(t *testing.T) {
|
|||
cancelFunc()
|
||||
})
|
||||
go func() {
|
||||
err := echo.Serve(ctx, &provisionersdk.ServeOptions{
|
||||
err := echo.Serve(ctx, fs, &provisionersdk.ServeOptions{
|
||||
Listener: server,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
@ -59,7 +61,7 @@ func TestEcho(t *testing.T) {
|
|||
})
|
||||
require.NoError(t, err)
|
||||
client, err := api.Parse(ctx, &proto.Parse_Request{
|
||||
Directory: unpackTar(t, data),
|
||||
Directory: unpackTar(t, fs, data),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
log, err := client.Recv()
|
||||
|
@ -98,7 +100,7 @@ func TestEcho(t *testing.T) {
|
|||
err = client.Send(&proto.Provision_Request{
|
||||
Type: &proto.Provision_Request_Start{
|
||||
Start: &proto.Provision_Start{
|
||||
Directory: unpackTar(t, data),
|
||||
Directory: unpackTar(t, fs, data),
|
||||
},
|
||||
},
|
||||
})
|
||||
|
@ -113,7 +115,7 @@ func TestEcho(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func unpackTar(t *testing.T, data []byte) string {
|
||||
func unpackTar(t *testing.T, fs afero.Fs, data []byte) string {
|
||||
directory := t.TempDir()
|
||||
reader := tar.NewReader(bytes.NewReader(data))
|
||||
for {
|
||||
|
@ -123,7 +125,7 @@ func unpackTar(t *testing.T, data []byte) string {
|
|||
}
|
||||
// #nosec
|
||||
path := filepath.Join(directory, header.Name)
|
||||
file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0600)
|
||||
file, err := fs.OpenFile(path, os.O_CREATE|os.O_RDWR, 0600)
|
||||
require.NoError(t, err)
|
||||
_, err = io.CopyN(file, reader, 1<<20)
|
||||
require.ErrorIs(t, err, io.EOF)
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
|
||||
"github.com/google/uuid"
|
||||
"github.com/hashicorp/yamux"
|
||||
"github.com/spf13/afero"
|
||||
"go.uber.org/atomic"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
|
@ -45,7 +46,8 @@ type Provisioners map[string]sdkproto.DRPCProvisionerClient
|
|||
|
||||
// Options provides customizations to the behavior of a provisioner daemon.
|
||||
type Options struct {
|
||||
Logger slog.Logger
|
||||
Filesystem afero.Fs
|
||||
Logger slog.Logger
|
||||
|
||||
ForceCancelInterval time.Duration
|
||||
UpdateInterval time.Duration
|
||||
|
@ -65,6 +67,9 @@ func New(clientDialer Dialer, opts *Options) *Server {
|
|||
if opts.ForceCancelInterval == 0 {
|
||||
opts.ForceCancelInterval = time.Minute
|
||||
}
|
||||
if opts.Filesystem == nil {
|
||||
opts.Filesystem = afero.NewOsFs()
|
||||
}
|
||||
ctx, ctxCancel := context.WithCancel(context.Background())
|
||||
daemon := &Server{
|
||||
clientDialer: clientDialer,
|
||||
|
@ -303,7 +308,7 @@ func (p *Server) runJob(ctx context.Context, job *proto.AcquiredJob) {
|
|||
defer func() {
|
||||
// Cleanup the work directory after execution.
|
||||
for attempt := 0; attempt < 5; attempt++ {
|
||||
err := os.RemoveAll(p.opts.WorkDirectory)
|
||||
err := p.opts.Filesystem.RemoveAll(p.opts.WorkDirectory)
|
||||
if err != nil {
|
||||
// On Windows, open files cannot be removed.
|
||||
// When the provisioner daemon is shutting down,
|
||||
|
@ -326,7 +331,7 @@ func (p *Server) runJob(ctx context.Context, job *proto.AcquiredJob) {
|
|||
return
|
||||
}
|
||||
|
||||
err := os.MkdirAll(p.opts.WorkDirectory, 0700)
|
||||
err := p.opts.Filesystem.MkdirAll(p.opts.WorkDirectory, 0700)
|
||||
if err != nil {
|
||||
p.failActiveJobf("create work directory %q: %s", p.opts.WorkDirectory, err)
|
||||
return
|
||||
|
@ -374,14 +379,14 @@ func (p *Server) runJob(ctx context.Context, job *proto.AcquiredJob) {
|
|||
}
|
||||
switch header.Typeflag {
|
||||
case tar.TypeDir:
|
||||
err = os.MkdirAll(headerPath, mode)
|
||||
err = p.opts.Filesystem.MkdirAll(headerPath, mode)
|
||||
if err != nil {
|
||||
p.failActiveJobf("mkdir %q: %s", headerPath, err)
|
||||
return
|
||||
}
|
||||
p.opts.Logger.Debug(context.Background(), "extracted directory", slog.F("path", headerPath))
|
||||
case tar.TypeReg:
|
||||
file, err := os.OpenFile(headerPath, os.O_CREATE|os.O_RDWR, mode)
|
||||
file, err := p.opts.Filesystem.OpenFile(headerPath, os.O_CREATE|os.O_RDWR, mode)
|
||||
if err != nil {
|
||||
p.failActiveJobf("create file %q (mode %s): %s", headerPath, mode, err)
|
||||
return
|
||||
|
@ -470,7 +475,7 @@ func (p *Server) runReadmeParse(ctx context.Context, job *proto.AcquiredJob) {
|
|||
return
|
||||
}
|
||||
|
||||
fi, err := os.ReadFile(path.Join(p.opts.WorkDirectory, ReadmeFile))
|
||||
fi, err := afero.ReadFile(p.opts.Filesystem, path.Join(p.opts.WorkDirectory, ReadmeFile))
|
||||
if err != nil {
|
||||
_, err := client.UpdateJob(ctx, &proto.UpdateJobRequest{
|
||||
JobId: job.GetJobId(),
|
||||
|
|
Loading…
Reference in New Issue