chore: Fix changes from buffer provisioner logs (#4924)

Comments from #4918 were missed because of auto-merge.
This commit is contained in:
Kyle Carberry 2022-11-06 21:59:01 -08:00 committed by GitHub
parent 30281852d6
commit 53f2449e4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 9 additions and 5 deletions

View File

@ -50,7 +50,7 @@ type Options struct {
ForceCancelInterval time.Duration
UpdateInterval time.Duration
LogDebounceInterval time.Duration
LogBufferInterval time.Duration
PollInterval time.Duration
Provisioners Provisioners
WorkDirectory string
@ -67,8 +67,8 @@ func New(clientDialer Dialer, opts *Options) *Server {
if opts.ForceCancelInterval == 0 {
opts.ForceCancelInterval = time.Minute
}
if opts.LogDebounceInterval == 0 {
opts.LogDebounceInterval = 50 * time.Millisecond
if opts.LogBufferInterval == 0 {
opts.LogBufferInterval = 50 * time.Millisecond
}
if opts.Filesystem == nil {
opts.Filesystem = afero.NewOsFs()
@ -329,7 +329,7 @@ func (p *Server) acquireJob(ctx context.Context) {
provisioner,
p.opts.UpdateInterval,
p.opts.ForceCancelInterval,
p.opts.LogDebounceInterval,
p.opts.LogBufferInterval,
p.tracer,
p.opts.Metrics.Runner,
)

View File

@ -898,6 +898,9 @@ func (r *Runner) startTrace(ctx context.Context, name string, opts ...trace.Span
))...)
}
// queueLog adds a log to the buffer and debounces a timer
// if one exists to flush the logs. It stores a maximum of
// 100 log lines before flushing as a safe-guard mechanism.
func (r *Runner) queueLog(ctx context.Context, log *proto.Log) {
r.mutex.Lock()
defer r.mutex.Unlock()
@ -906,6 +909,7 @@ func (r *Runner) queueLog(ctx context.Context, log *proto.Log) {
r.flushLogsTimer.Reset(r.logBufferInterval)
return
}
// This can be configurable if there are a ton of logs.
if len(r.queuedLogs) > 100 {
// Flushing logs requires a lock, so this can happen async.
go r.flushQueuedLogs(ctx)
@ -921,7 +925,7 @@ func (r *Runner) flushQueuedLogs(ctx context.Context) {
if r.flushLogsTimer != nil {
r.flushLogsTimer.Stop()
}
logs := r.queuedLogs[:]
logs := r.queuedLogs
r.queuedLogs = make([]*proto.Log, 0)
r.mutex.Unlock()
_, err := r.update(ctx, &proto.UpdateJobRequest{