feat: add debouncing to provisionerd rpc calls (#5198)

This commit is contained in:
Colin Adler 2022-12-01 16:54:53 -06:00 committed by GitHub
parent 5457dd0c65
commit ab3b3d5fca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 123 additions and 46 deletions

1
.gitignore vendored
View File

@ -14,6 +14,7 @@ vendor
.eslintcache
yarn-error.log
gotests.coverage
gotests.xml
.idea
.gitpod.yml
.DS_Store

View File

@ -388,6 +388,18 @@ func newConfig() *codersdk.DeploymentConfig {
Flag: "provisioner-daemons",
Default: 3,
},
DaemonPollInterval: &codersdk.DeploymentConfigField[time.Duration]{
Name: "Poll Interval",
Usage: "Time to wait before polling for a new job.",
Flag: "provisioner-daemon-poll-interval",
Default: time.Second,
},
DaemonPollJitter: &codersdk.DeploymentConfigField[time.Duration]{
Name: "Poll Jitter",
Usage: "Random jitter added to the poll interval.",
Flag: "provisioner-daemon-poll-jitter",
Default: 100 * time.Millisecond,
},
ForceCancelInterval: &codersdk.DeploymentConfigField[time.Duration]{
Name: "Force Cancel Interval",
Usage: "Time to force cancel provisioning tasks that are stuck.",

View File

@ -2,6 +2,7 @@ package deployment_test
import (
"testing"
"time"
"github.com/spf13/pflag"
"github.com/stretchr/testify/require"
@ -25,20 +26,22 @@ func TestConfig(t *testing.T) {
}{{
Name: "Deployment",
Env: map[string]string{
"CODER_ADDRESS": "0.0.0.0:8443",
"CODER_ACCESS_URL": "https://dev.coder.com",
"CODER_PG_CONNECTION_URL": "some-url",
"CODER_PPROF_ADDRESS": "something",
"CODER_PPROF_ENABLE": "true",
"CODER_PROMETHEUS_ADDRESS": "hello-world",
"CODER_PROMETHEUS_ENABLE": "true",
"CODER_PROVISIONER_DAEMONS": "5",
"CODER_SECURE_AUTH_COOKIE": "true",
"CODER_SSH_KEYGEN_ALGORITHM": "potato",
"CODER_TELEMETRY": "false",
"CODER_TELEMETRY_TRACE": "false",
"CODER_WILDCARD_ACCESS_URL": "something-wildcard.com",
"CODER_UPDATE_CHECK": "false",
"CODER_ADDRESS": "0.0.0.0:8443",
"CODER_ACCESS_URL": "https://dev.coder.com",
"CODER_PG_CONNECTION_URL": "some-url",
"CODER_PPROF_ADDRESS": "something",
"CODER_PPROF_ENABLE": "true",
"CODER_PROMETHEUS_ADDRESS": "hello-world",
"CODER_PROMETHEUS_ENABLE": "true",
"CODER_PROVISIONER_DAEMONS": "5",
"CODER_PROVISIONER_DAEMON_POLL_INTERVAL": "5s",
"CODER_PROVISIONER_DAEMON_POLL_JITTER": "1s",
"CODER_SECURE_AUTH_COOKIE": "true",
"CODER_SSH_KEYGEN_ALGORITHM": "potato",
"CODER_TELEMETRY": "false",
"CODER_TELEMETRY_TRACE": "false",
"CODER_WILDCARD_ACCESS_URL": "something-wildcard.com",
"CODER_UPDATE_CHECK": "false",
},
Valid: func(config *codersdk.DeploymentConfig) {
require.Equal(t, config.Address.Value, "0.0.0.0:8443")
@ -49,6 +52,8 @@ func TestConfig(t *testing.T) {
require.Equal(t, config.Prometheus.Address.Value, "hello-world")
require.Equal(t, config.Prometheus.Enable.Value, true)
require.Equal(t, config.Provisioner.Daemons.Value, 5)
require.Equal(t, config.Provisioner.DaemonPollInterval.Value, 5*time.Second)
require.Equal(t, config.Provisioner.DaemonPollJitter.Value, 1*time.Second)
require.Equal(t, config.SecureAuthCookie.Value, true)
require.Equal(t, config.SSHKeygenAlgorithm.Value, "potato")
require.Equal(t, config.Telemetry.Enable.Value, false)

View File

@ -22,6 +22,8 @@ import (
"github.com/coder/coder/testutil"
)
// To update the golden files:
// make update-golden-files
var updateGoldenFiles = flag.Bool("update", false, "update .golden files")
//nolint:tparallel,paralleltest // These test sets env vars.

View File

@ -970,13 +970,16 @@ func newProvisionerDaemon(
}()
provisioners[string(database.ProvisionerTypeEcho)] = sdkproto.NewDRPCProvisionerClient(echoClient)
}
debounce := time.Second
return provisionerd.New(func(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error) {
// This debounces calls to listen every second. Read the comment
// in provisionerdserver.go to learn more!
return coderAPI.CreateInMemoryProvisionerDaemon(ctx, time.Second)
return coderAPI.CreateInMemoryProvisionerDaemon(ctx, debounce)
}, &provisionerd.Options{
Logger: logger,
PollInterval: 500 * time.Millisecond,
JobPollInterval: cfg.Provisioner.DaemonPollInterval.Value,
JobPollJitter: cfg.Provisioner.DaemonPollJitter.Value,
JobPollDebounce: debounce,
UpdateInterval: 500 * time.Millisecond,
ForceCancelInterval: cfg.Provisioner.ForceCancelInterval.Value,
Provisioners: provisioners,

View File

@ -128,6 +128,15 @@ Flags:
--prometheus-enable Serve prometheus metrics on the address
defined by prometheus address.
Consumes $CODER_PROMETHEUS_ENABLE
--provisioner-daemon-poll-interval duration Time to wait before polling for a new
job.
Consumes
$CODER_PROVISIONER_DAEMON_POLL_INTERVAL
(default 1s)
--provisioner-daemon-poll-jitter duration Random jitter added to the poll interval.
Consumes
$CODER_PROVISIONER_DAEMON_POLL_JITTER
(default 100ms)
--provisioner-daemons int Number of provisioner daemons to create
on start. If builds are stuck in queued
state for a long time, consider

View File

@ -643,8 +643,8 @@ func compressHandler(h http.Handler) http.Handler {
return cmp.Handler(h)
}
// CreateInMemoryProvisionerDaemon is an in-memory connection to a provisionerd. Useful when starting coderd and provisionerd
// in the same process.
// CreateInMemoryProvisionerDaemon is an in-memory connection to a provisionerd.
// Useful when starting coderd and provisionerd in the same process.
func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce time.Duration) (client proto.DRPCProvisionerDaemonClient, err error) {
clientSession, serverSession := provisionersdk.MemTransportPipe()
defer func() {

View File

@ -340,7 +340,7 @@ func NewProvisionerDaemon(t *testing.T, coderAPI *coderd.API) io.Closer {
}, &provisionerd.Options{
Filesystem: fs,
Logger: slogtest.Make(t, nil).Named("provisionerd").Leveled(slog.LevelDebug),
PollInterval: 50 * time.Millisecond,
JobPollInterval: 50 * time.Millisecond,
UpdateInterval: 250 * time.Millisecond,
ForceCancelInterval: time.Second,
Provisioners: provisionerd.Provisioners{
@ -375,7 +375,7 @@ func NewExternalProvisionerDaemon(t *testing.T, client *codersdk.Client, org uui
}, &provisionerd.Options{
Filesystem: fs,
Logger: slogtest.Make(t, nil).Named("provisionerd").Leveled(slog.LevelDebug),
PollInterval: 50 * time.Millisecond,
JobPollInterval: 50 * time.Millisecond,
UpdateInterval: 250 * time.Millisecond,
ForceCancelInterval: time.Second,
Provisioners: provisionerd.Provisioners{

View File

@ -135,6 +135,8 @@ type GitAuthConfig struct {
type ProvisionerConfig struct {
Daemons *DeploymentConfigField[int] `json:"daemons" typescript:",notnull"`
DaemonPollInterval *DeploymentConfigField[time.Duration] `json:"daemon_poll_interval" typescript:",notnull"`
DaemonPollJitter *DeploymentConfigField[time.Duration] `json:"daemon_poll_jitter" typescript:",notnull"`
ForceCancelInterval *DeploymentConfigField[time.Duration] `json:"force_cancel_interval" typescript:",notnull"`
}

View File

@ -7,6 +7,9 @@ import (
"os/signal"
"time"
"github.com/spf13/cobra"
"golang.org/x/xerrors"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/sloghuman"
agpl "github.com/coder/coder/cli"
@ -20,9 +23,6 @@ import (
provisionerdproto "github.com/coder/coder/provisionerd/proto"
"github.com/coder/coder/provisionersdk"
"github.com/coder/coder/provisionersdk/proto"
"github.com/spf13/cobra"
"golang.org/x/xerrors"
)
func provisionerDaemons() *cobra.Command {
@ -37,8 +37,10 @@ func provisionerDaemons() *cobra.Command {
func provisionerDaemonStart() *cobra.Command {
var (
cacheDir string
rawTags []string
cacheDir string
rawTags []string
pollInterval time.Duration
pollJitter time.Duration
)
cmd := &cobra.Command{
Use: "start",
@ -111,11 +113,12 @@ func provisionerDaemonStart() *cobra.Command {
codersdk.ProvisionerTypeTerraform,
}, tags)
}, &provisionerd.Options{
Logger: logger,
PollInterval: 500 * time.Millisecond,
UpdateInterval: 500 * time.Millisecond,
Provisioners: provisioners,
WorkDirectory: tempDir,
Logger: logger,
JobPollInterval: pollInterval,
JobPollJitter: pollJitter,
UpdateInterval: 500 * time.Millisecond,
Provisioners: provisioners,
WorkDirectory: tempDir,
})
var exitErr error
@ -150,6 +153,10 @@ func provisionerDaemonStart() *cobra.Command {
"Specify a directory to cache provisioner job files.")
cliflag.StringArrayVarP(cmd.Flags(), &rawTags, "tag", "t", "CODER_PROVISIONERD_TAGS", []string{},
"Specify a list of tags to target provisioner jobs.")
cliflag.DurationVarP(cmd.Flags(), &pollInterval, "poll-interval", "", "CODER_PROVISIONERD_POLL_INTERVAL", time.Second,
"Specify the interval for which the provisioner daemon should poll for jobs.")
cliflag.DurationVarP(cmd.Flags(), &pollJitter, "poll-jitter", "", "CODER_PROVISIONERD_POLL_JITTER", 100*time.Millisecond,
"Random jitter added to the poll interval.")
return cmd
}

View File

@ -18,7 +18,6 @@ import (
"storj.io/drpc/drpcserver"
"cdr.dev/slog"
"github.com/coder/coder/coderd"
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/httpapi"

View File

@ -23,6 +23,7 @@ import (
"cdr.dev/slog"
"github.com/coder/coder/coderd/tracing"
"github.com/coder/coder/cryptorand"
"github.com/coder/coder/provisionerd/proto"
"github.com/coder/coder/provisionerd/runner"
sdkproto "github.com/coder/coder/provisionersdk/proto"
@ -52,7 +53,9 @@ type Options struct {
ForceCancelInterval time.Duration
UpdateInterval time.Duration
LogBufferInterval time.Duration
PollInterval time.Duration
JobPollInterval time.Duration
JobPollJitter time.Duration
JobPollDebounce time.Duration
Provisioners Provisioners
WorkDirectory string
}
@ -62,8 +65,11 @@ func New(clientDialer Dialer, opts *Options) *Server {
if opts == nil {
opts = &Options{}
}
if opts.PollInterval == 0 {
opts.PollInterval = 5 * time.Second
if opts.JobPollInterval == 0 {
opts.JobPollInterval = 5 * time.Second
}
if opts.JobPollJitter == 0 {
opts.JobPollJitter = time.Second
}
if opts.UpdateInterval == 0 {
opts.UpdateInterval = 5 * time.Second
@ -207,8 +213,8 @@ func (p *Server) connect(ctx context.Context) {
if p.isClosed() {
return
}
ticker := time.NewTicker(p.opts.PollInterval)
defer ticker.Stop()
timer := time.NewTimer(p.opts.JobPollInterval)
defer timer.Stop()
for {
client, ok := p.client()
if !ok {
@ -219,13 +225,23 @@ func (p *Server) connect(ctx context.Context) {
return
case <-client.DRPCConn().Closed():
return
case <-ticker.C:
case <-timer.C:
p.acquireJob(ctx)
timer.Reset(p.nextInterval())
}
}
}()
}
func (p *Server) nextInterval() time.Duration {
r, err := cryptorand.Float64()
if err != nil {
panic("get random float:" + err.Error())
}
return p.opts.JobPollInterval + time.Duration(float64(p.opts.JobPollJitter)*r)
}
func (p *Server) client() (proto.DRPCProvisionerDaemonClient, bool) {
rawClient := p.clientValue.Load()
if rawClient == nil {
@ -248,6 +264,11 @@ func (p *Server) isRunningJob() bool {
}
}
var (
lastAcquire time.Time
lastAcquireMutex sync.RWMutex
)
// Locks a job in the database, and runs it!
func (p *Server) acquireJob(ctx context.Context) {
p.mutex.Lock()
@ -263,6 +284,18 @@ func (p *Server) acquireJob(ctx context.Context) {
return
}
// This prevents loads of provisioner daemons from consistently sending
// requests when no jobs are available.
//
// The debounce only occurs when no job is returned, so if loads of jobs are
// added at once, they will start after at most this duration.
lastAcquireMutex.RLock()
if !lastAcquire.IsZero() && time.Since(lastAcquire) < p.opts.JobPollDebounce {
lastAcquireMutex.RUnlock()
return
}
lastAcquireMutex.RUnlock()
var err error
client, ok := p.client()
if !ok {
@ -271,10 +304,9 @@ func (p *Server) acquireJob(ctx context.Context) {
job, err := client.AcquireJob(ctx, &proto.Empty{})
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
if errors.Is(err, yamux.ErrSessionShutdown) {
if errors.Is(err, context.Canceled) ||
errors.Is(err, yamux.ErrSessionShutdown) ||
errors.Is(err, fasthttputil.ErrInmemoryListenerClosed) {
return
}
@ -282,6 +314,9 @@ func (p *Server) acquireJob(ctx context.Context) {
return
}
if job.JobId == "" {
lastAcquireMutex.Lock()
lastAcquire = time.Now()
lastAcquireMutex.Unlock()
return
}

View File

@ -1053,11 +1053,11 @@ func createTar(t *testing.T, files map[string]string) []byte {
// Creates a provisionerd implementation with the provided dialer and provisioners.
func createProvisionerd(t *testing.T, dialer provisionerd.Dialer, provisioners provisionerd.Provisioners) *provisionerd.Server {
server := provisionerd.New(dialer, &provisionerd.Options{
Logger: slogtest.Make(t, nil).Named("provisionerd").Leveled(slog.LevelDebug),
PollInterval: 50 * time.Millisecond,
UpdateInterval: 50 * time.Millisecond,
Provisioners: provisioners,
WorkDirectory: t.TempDir(),
Logger: slogtest.Make(t, nil).Named("provisionerd").Leveled(slog.LevelDebug),
JobPollInterval: 50 * time.Millisecond,
UpdateInterval: 50 * time.Millisecond,
Provisioners: provisioners,
WorkDirectory: t.TempDir(),
})
t.Cleanup(func() {
_ = server.Close()

View File

@ -527,6 +527,8 @@ export interface PrometheusConfig {
// From codersdk/deploymentconfig.go
export interface ProvisionerConfig {
readonly daemons: DeploymentConfigField<number>
readonly daemon_poll_interval: DeploymentConfigField<number>
readonly daemon_poll_jitter: DeploymentConfigField<number>
readonly force_cancel_interval: DeploymentConfigField<number>
}