From ab3b3d5fca7771d9409517028ebdf06e732c7e48 Mon Sep 17 00:00:00 2001 From: Colin Adler Date: Thu, 1 Dec 2022 16:54:53 -0600 Subject: [PATCH] feat: add debouncing to provisionerd rpc calls (#5198) --- .gitignore | 1 + cli/deployment/config.go | 12 ++++++ cli/deployment/config_test.go | 33 ++++++++------- cli/root_test.go | 2 + cli/server.go | 7 +++- cli/testdata/coder_server_--help.golden | 9 ++++ coderd/coderd.go | 4 +- coderd/coderdtest/coderdtest.go | 4 +- codersdk/deploymentconfig.go | 2 + enterprise/cli/provisionerdaemons.go | 27 +++++++----- enterprise/coderd/provisionerdaemons.go | 1 - provisionerd/provisionerd.go | 55 ++++++++++++++++++++----- provisionerd/provisionerd_test.go | 10 ++--- site/src/api/typesGenerated.ts | 2 + 14 files changed, 123 insertions(+), 46 deletions(-) diff --git a/.gitignore b/.gitignore index 25c5e1c798..b401caa56c 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ vendor .eslintcache yarn-error.log gotests.coverage +gotests.xml .idea .gitpod.yml .DS_Store diff --git a/cli/deployment/config.go b/cli/deployment/config.go index a28918b562..9e00d9f764 100644 --- a/cli/deployment/config.go +++ b/cli/deployment/config.go @@ -388,6 +388,18 @@ func newConfig() *codersdk.DeploymentConfig { Flag: "provisioner-daemons", Default: 3, }, + DaemonPollInterval: &codersdk.DeploymentConfigField[time.Duration]{ + Name: "Poll Interval", + Usage: "Time to wait before polling for a new job.", + Flag: "provisioner-daemon-poll-interval", + Default: time.Second, + }, + DaemonPollJitter: &codersdk.DeploymentConfigField[time.Duration]{ + Name: "Poll Jitter", + Usage: "Random jitter added to the poll interval.", + Flag: "provisioner-daemon-poll-jitter", + Default: 100 * time.Millisecond, + }, ForceCancelInterval: &codersdk.DeploymentConfigField[time.Duration]{ Name: "Force Cancel Interval", Usage: "Time to force cancel provisioning tasks that are stuck.", diff --git a/cli/deployment/config_test.go b/cli/deployment/config_test.go index 1994da924f..2f20754bcc 100644 --- a/cli/deployment/config_test.go +++ b/cli/deployment/config_test.go @@ -2,6 +2,7 @@ package deployment_test import ( "testing" + "time" "github.com/spf13/pflag" "github.com/stretchr/testify/require" @@ -25,20 +26,22 @@ func TestConfig(t *testing.T) { }{{ Name: "Deployment", Env: map[string]string{ - "CODER_ADDRESS": "0.0.0.0:8443", - "CODER_ACCESS_URL": "https://dev.coder.com", - "CODER_PG_CONNECTION_URL": "some-url", - "CODER_PPROF_ADDRESS": "something", - "CODER_PPROF_ENABLE": "true", - "CODER_PROMETHEUS_ADDRESS": "hello-world", - "CODER_PROMETHEUS_ENABLE": "true", - "CODER_PROVISIONER_DAEMONS": "5", - "CODER_SECURE_AUTH_COOKIE": "true", - "CODER_SSH_KEYGEN_ALGORITHM": "potato", - "CODER_TELEMETRY": "false", - "CODER_TELEMETRY_TRACE": "false", - "CODER_WILDCARD_ACCESS_URL": "something-wildcard.com", - "CODER_UPDATE_CHECK": "false", + "CODER_ADDRESS": "0.0.0.0:8443", + "CODER_ACCESS_URL": "https://dev.coder.com", + "CODER_PG_CONNECTION_URL": "some-url", + "CODER_PPROF_ADDRESS": "something", + "CODER_PPROF_ENABLE": "true", + "CODER_PROMETHEUS_ADDRESS": "hello-world", + "CODER_PROMETHEUS_ENABLE": "true", + "CODER_PROVISIONER_DAEMONS": "5", + "CODER_PROVISIONER_DAEMON_POLL_INTERVAL": "5s", + "CODER_PROVISIONER_DAEMON_POLL_JITTER": "1s", + "CODER_SECURE_AUTH_COOKIE": "true", + "CODER_SSH_KEYGEN_ALGORITHM": "potato", + "CODER_TELEMETRY": "false", + "CODER_TELEMETRY_TRACE": "false", + "CODER_WILDCARD_ACCESS_URL": "something-wildcard.com", + "CODER_UPDATE_CHECK": "false", }, Valid: func(config *codersdk.DeploymentConfig) { require.Equal(t, config.Address.Value, "0.0.0.0:8443") @@ -49,6 +52,8 @@ func TestConfig(t *testing.T) { require.Equal(t, config.Prometheus.Address.Value, "hello-world") require.Equal(t, config.Prometheus.Enable.Value, true) require.Equal(t, config.Provisioner.Daemons.Value, 5) + require.Equal(t, config.Provisioner.DaemonPollInterval.Value, 5*time.Second) + require.Equal(t, config.Provisioner.DaemonPollJitter.Value, 1*time.Second) require.Equal(t, config.SecureAuthCookie.Value, true) require.Equal(t, config.SSHKeygenAlgorithm.Value, "potato") require.Equal(t, config.Telemetry.Enable.Value, false) diff --git a/cli/root_test.go b/cli/root_test.go index 1ff3d18d36..bb34e9b6da 100644 --- a/cli/root_test.go +++ b/cli/root_test.go @@ -22,6 +22,8 @@ import ( "github.com/coder/coder/testutil" ) +// To update the golden files: +// make update-golden-files var updateGoldenFiles = flag.Bool("update", false, "update .golden files") //nolint:tparallel,paralleltest // These test sets env vars. diff --git a/cli/server.go b/cli/server.go index 70e31eaa15..ca0cd26524 100644 --- a/cli/server.go +++ b/cli/server.go @@ -970,13 +970,16 @@ func newProvisionerDaemon( }() provisioners[string(database.ProvisionerTypeEcho)] = sdkproto.NewDRPCProvisionerClient(echoClient) } + debounce := time.Second return provisionerd.New(func(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error) { // This debounces calls to listen every second. Read the comment // in provisionerdserver.go to learn more! - return coderAPI.CreateInMemoryProvisionerDaemon(ctx, time.Second) + return coderAPI.CreateInMemoryProvisionerDaemon(ctx, debounce) }, &provisionerd.Options{ Logger: logger, - PollInterval: 500 * time.Millisecond, + JobPollInterval: cfg.Provisioner.DaemonPollInterval.Value, + JobPollJitter: cfg.Provisioner.DaemonPollJitter.Value, + JobPollDebounce: debounce, UpdateInterval: 500 * time.Millisecond, ForceCancelInterval: cfg.Provisioner.ForceCancelInterval.Value, Provisioners: provisioners, diff --git a/cli/testdata/coder_server_--help.golden b/cli/testdata/coder_server_--help.golden index 80627b157f..9470d6126c 100644 --- a/cli/testdata/coder_server_--help.golden +++ b/cli/testdata/coder_server_--help.golden @@ -128,6 +128,15 @@ Flags: --prometheus-enable Serve prometheus metrics on the address defined by prometheus address. Consumes $CODER_PROMETHEUS_ENABLE + --provisioner-daemon-poll-interval duration Time to wait before polling for a new + job. + Consumes + $CODER_PROVISIONER_DAEMON_POLL_INTERVAL + (default 1s) + --provisioner-daemon-poll-jitter duration Random jitter added to the poll interval. + Consumes + $CODER_PROVISIONER_DAEMON_POLL_JITTER + (default 100ms) --provisioner-daemons int Number of provisioner daemons to create on start. If builds are stuck in queued state for a long time, consider diff --git a/coderd/coderd.go b/coderd/coderd.go index 4cf2762fe9..fbc20cbe74 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -643,8 +643,8 @@ func compressHandler(h http.Handler) http.Handler { return cmp.Handler(h) } -// CreateInMemoryProvisionerDaemon is an in-memory connection to a provisionerd. Useful when starting coderd and provisionerd -// in the same process. +// CreateInMemoryProvisionerDaemon is an in-memory connection to a provisionerd. +// Useful when starting coderd and provisionerd in the same process. func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce time.Duration) (client proto.DRPCProvisionerDaemonClient, err error) { clientSession, serverSession := provisionersdk.MemTransportPipe() defer func() { diff --git a/coderd/coderdtest/coderdtest.go b/coderd/coderdtest/coderdtest.go index 65f115ae9c..e489a759c0 100644 --- a/coderd/coderdtest/coderdtest.go +++ b/coderd/coderdtest/coderdtest.go @@ -340,7 +340,7 @@ func NewProvisionerDaemon(t *testing.T, coderAPI *coderd.API) io.Closer { }, &provisionerd.Options{ Filesystem: fs, Logger: slogtest.Make(t, nil).Named("provisionerd").Leveled(slog.LevelDebug), - PollInterval: 50 * time.Millisecond, + JobPollInterval: 50 * time.Millisecond, UpdateInterval: 250 * time.Millisecond, ForceCancelInterval: time.Second, Provisioners: provisionerd.Provisioners{ @@ -375,7 +375,7 @@ func NewExternalProvisionerDaemon(t *testing.T, client *codersdk.Client, org uui }, &provisionerd.Options{ Filesystem: fs, Logger: slogtest.Make(t, nil).Named("provisionerd").Leveled(slog.LevelDebug), - PollInterval: 50 * time.Millisecond, + JobPollInterval: 50 * time.Millisecond, UpdateInterval: 250 * time.Millisecond, ForceCancelInterval: time.Second, Provisioners: provisionerd.Provisioners{ diff --git a/codersdk/deploymentconfig.go b/codersdk/deploymentconfig.go index c274eeee90..4d4c7b4e74 100644 --- a/codersdk/deploymentconfig.go +++ b/codersdk/deploymentconfig.go @@ -135,6 +135,8 @@ type GitAuthConfig struct { type ProvisionerConfig struct { Daemons *DeploymentConfigField[int] `json:"daemons" typescript:",notnull"` + DaemonPollInterval *DeploymentConfigField[time.Duration] `json:"daemon_poll_interval" typescript:",notnull"` + DaemonPollJitter *DeploymentConfigField[time.Duration] `json:"daemon_poll_jitter" typescript:",notnull"` ForceCancelInterval *DeploymentConfigField[time.Duration] `json:"force_cancel_interval" typescript:",notnull"` } diff --git a/enterprise/cli/provisionerdaemons.go b/enterprise/cli/provisionerdaemons.go index 8c4fa50e44..8182b5c212 100644 --- a/enterprise/cli/provisionerdaemons.go +++ b/enterprise/cli/provisionerdaemons.go @@ -7,6 +7,9 @@ import ( "os/signal" "time" + "github.com/spf13/cobra" + "golang.org/x/xerrors" + "cdr.dev/slog" "cdr.dev/slog/sloggers/sloghuman" agpl "github.com/coder/coder/cli" @@ -20,9 +23,6 @@ import ( provisionerdproto "github.com/coder/coder/provisionerd/proto" "github.com/coder/coder/provisionersdk" "github.com/coder/coder/provisionersdk/proto" - - "github.com/spf13/cobra" - "golang.org/x/xerrors" ) func provisionerDaemons() *cobra.Command { @@ -37,8 +37,10 @@ func provisionerDaemons() *cobra.Command { func provisionerDaemonStart() *cobra.Command { var ( - cacheDir string - rawTags []string + cacheDir string + rawTags []string + pollInterval time.Duration + pollJitter time.Duration ) cmd := &cobra.Command{ Use: "start", @@ -111,11 +113,12 @@ func provisionerDaemonStart() *cobra.Command { codersdk.ProvisionerTypeTerraform, }, tags) }, &provisionerd.Options{ - Logger: logger, - PollInterval: 500 * time.Millisecond, - UpdateInterval: 500 * time.Millisecond, - Provisioners: provisioners, - WorkDirectory: tempDir, + Logger: logger, + JobPollInterval: pollInterval, + JobPollJitter: pollJitter, + UpdateInterval: 500 * time.Millisecond, + Provisioners: provisioners, + WorkDirectory: tempDir, }) var exitErr error @@ -150,6 +153,10 @@ func provisionerDaemonStart() *cobra.Command { "Specify a directory to cache provisioner job files.") cliflag.StringArrayVarP(cmd.Flags(), &rawTags, "tag", "t", "CODER_PROVISIONERD_TAGS", []string{}, "Specify a list of tags to target provisioner jobs.") + cliflag.DurationVarP(cmd.Flags(), &pollInterval, "poll-interval", "", "CODER_PROVISIONERD_POLL_INTERVAL", time.Second, + "Specify the interval for which the provisioner daemon should poll for jobs.") + cliflag.DurationVarP(cmd.Flags(), &pollJitter, "poll-jitter", "", "CODER_PROVISIONERD_POLL_JITTER", 100*time.Millisecond, + "Random jitter added to the poll interval.") return cmd } diff --git a/enterprise/coderd/provisionerdaemons.go b/enterprise/coderd/provisionerdaemons.go index 95aab294d5..d13c4ffdd1 100644 --- a/enterprise/coderd/provisionerdaemons.go +++ b/enterprise/coderd/provisionerdaemons.go @@ -18,7 +18,6 @@ import ( "storj.io/drpc/drpcserver" "cdr.dev/slog" - "github.com/coder/coder/coderd" "github.com/coder/coder/coderd/database" "github.com/coder/coder/coderd/httpapi" diff --git a/provisionerd/provisionerd.go b/provisionerd/provisionerd.go index 322773b9f0..ce01d835dd 100644 --- a/provisionerd/provisionerd.go +++ b/provisionerd/provisionerd.go @@ -23,6 +23,7 @@ import ( "cdr.dev/slog" "github.com/coder/coder/coderd/tracing" + "github.com/coder/coder/cryptorand" "github.com/coder/coder/provisionerd/proto" "github.com/coder/coder/provisionerd/runner" sdkproto "github.com/coder/coder/provisionersdk/proto" @@ -52,7 +53,9 @@ type Options struct { ForceCancelInterval time.Duration UpdateInterval time.Duration LogBufferInterval time.Duration - PollInterval time.Duration + JobPollInterval time.Duration + JobPollJitter time.Duration + JobPollDebounce time.Duration Provisioners Provisioners WorkDirectory string } @@ -62,8 +65,11 @@ func New(clientDialer Dialer, opts *Options) *Server { if opts == nil { opts = &Options{} } - if opts.PollInterval == 0 { - opts.PollInterval = 5 * time.Second + if opts.JobPollInterval == 0 { + opts.JobPollInterval = 5 * time.Second + } + if opts.JobPollJitter == 0 { + opts.JobPollJitter = time.Second } if opts.UpdateInterval == 0 { opts.UpdateInterval = 5 * time.Second @@ -207,8 +213,8 @@ func (p *Server) connect(ctx context.Context) { if p.isClosed() { return } - ticker := time.NewTicker(p.opts.PollInterval) - defer ticker.Stop() + timer := time.NewTimer(p.opts.JobPollInterval) + defer timer.Stop() for { client, ok := p.client() if !ok { @@ -219,13 +225,23 @@ func (p *Server) connect(ctx context.Context) { return case <-client.DRPCConn().Closed(): return - case <-ticker.C: + case <-timer.C: p.acquireJob(ctx) + timer.Reset(p.nextInterval()) } } }() } +func (p *Server) nextInterval() time.Duration { + r, err := cryptorand.Float64() + if err != nil { + panic("get random float:" + err.Error()) + } + + return p.opts.JobPollInterval + time.Duration(float64(p.opts.JobPollJitter)*r) +} + func (p *Server) client() (proto.DRPCProvisionerDaemonClient, bool) { rawClient := p.clientValue.Load() if rawClient == nil { @@ -248,6 +264,11 @@ func (p *Server) isRunningJob() bool { } } +var ( + lastAcquire time.Time + lastAcquireMutex sync.RWMutex +) + // Locks a job in the database, and runs it! func (p *Server) acquireJob(ctx context.Context) { p.mutex.Lock() @@ -263,6 +284,18 @@ func (p *Server) acquireJob(ctx context.Context) { return } + // This prevents loads of provisioner daemons from consistently sending + // requests when no jobs are available. + // + // The debounce only occurs when no job is returned, so if loads of jobs are + // added at once, they will start after at most this duration. + lastAcquireMutex.RLock() + if !lastAcquire.IsZero() && time.Since(lastAcquire) < p.opts.JobPollDebounce { + lastAcquireMutex.RUnlock() + return + } + lastAcquireMutex.RUnlock() + var err error client, ok := p.client() if !ok { @@ -271,10 +304,9 @@ func (p *Server) acquireJob(ctx context.Context) { job, err := client.AcquireJob(ctx, &proto.Empty{}) if err != nil { - if errors.Is(err, context.Canceled) { - return - } - if errors.Is(err, yamux.ErrSessionShutdown) { + if errors.Is(err, context.Canceled) || + errors.Is(err, yamux.ErrSessionShutdown) || + errors.Is(err, fasthttputil.ErrInmemoryListenerClosed) { return } @@ -282,6 +314,9 @@ func (p *Server) acquireJob(ctx context.Context) { return } if job.JobId == "" { + lastAcquireMutex.Lock() + lastAcquire = time.Now() + lastAcquireMutex.Unlock() return } diff --git a/provisionerd/provisionerd_test.go b/provisionerd/provisionerd_test.go index ec885a9a7f..4da7a3a6c9 100644 --- a/provisionerd/provisionerd_test.go +++ b/provisionerd/provisionerd_test.go @@ -1053,11 +1053,11 @@ func createTar(t *testing.T, files map[string]string) []byte { // Creates a provisionerd implementation with the provided dialer and provisioners. func createProvisionerd(t *testing.T, dialer provisionerd.Dialer, provisioners provisionerd.Provisioners) *provisionerd.Server { server := provisionerd.New(dialer, &provisionerd.Options{ - Logger: slogtest.Make(t, nil).Named("provisionerd").Leveled(slog.LevelDebug), - PollInterval: 50 * time.Millisecond, - UpdateInterval: 50 * time.Millisecond, - Provisioners: provisioners, - WorkDirectory: t.TempDir(), + Logger: slogtest.Make(t, nil).Named("provisionerd").Leveled(slog.LevelDebug), + JobPollInterval: 50 * time.Millisecond, + UpdateInterval: 50 * time.Millisecond, + Provisioners: provisioners, + WorkDirectory: t.TempDir(), }) t.Cleanup(func() { _ = server.Close() diff --git a/site/src/api/typesGenerated.ts b/site/src/api/typesGenerated.ts index 6142d03a36..4ff9b8e875 100644 --- a/site/src/api/typesGenerated.ts +++ b/site/src/api/typesGenerated.ts @@ -527,6 +527,8 @@ export interface PrometheusConfig { // From codersdk/deploymentconfig.go export interface ProvisionerConfig { readonly daemons: DeploymentConfigField + readonly daemon_poll_interval: DeploymentConfigField + readonly daemon_poll_jitter: DeploymentConfigField readonly force_cancel_interval: DeploymentConfigField }