chore: fix buffered provisioner job logs close flake (#6492)

See https://github.com/coder/coder/actions/runs/4357599919/jobs/7617111287
This commit is contained in:
Kyle Carberry 2023-03-07 14:08:13 -06:00 committed by GitHub
parent 1bdd2abed7
commit bb0a996fc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 15 additions and 12 deletions

View File

@ -49,7 +49,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
// if we are following logs, start the subscription before we query the database, so that we don't miss any logs
// between the end of our query and the start of the subscription. We might get duplicates, so we'll keep track
// of processed IDs.
var bufferedLogs <-chan database.ProvisionerJobLog
var bufferedLogs <-chan *database.ProvisionerJobLog
if follow {
bl, closeFollow, err := api.followLogs(actor, job.ID)
if err != nil {
@ -173,8 +173,9 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
logger.Debug(context.Background(), "job logs context canceled")
return
case log, ok := <-bufferedLogs:
if !ok {
logger.Debug(context.Background(), "done with published logs")
// A nil log is sent when complete!
if !ok || log == nil {
logger.Debug(context.Background(), "reached the end of published logs")
return
}
if logIdsDone[log.ID] {
@ -183,7 +184,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
} else {
logger.Debug(ctx, "subscribe encoding log",
slog.F("stage", log.Stage))
err = encoder.Encode(convertProvisionerJobLog(log))
err = encoder.Encode(convertProvisionerJobLog(*log))
if err != nil {
return
}
@ -369,12 +370,12 @@ type provisionerJobLogsMessage struct {
EndOfLogs bool `json:"end_of_logs,omitempty"`
}
func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan database.ProvisionerJobLog, func(), error) {
func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan *database.ProvisionerJobLog, func(), error) {
logger := api.Logger.With(slog.F("job_id", jobID))
var (
closed = make(chan struct{})
bufferedLogs = make(chan database.ProvisionerJobLog, 128)
bufferedLogs = make(chan *database.ProvisionerJobLog, 128)
logMut = &sync.Mutex{}
)
closeSubscribe, err := api.Pubsub.Subscribe(
@ -415,9 +416,9 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan database
return
default:
}
log := log
select {
case bufferedLogs <- log:
case bufferedLogs <- &log:
logger.Debug(ctx, "subscribe buffered log", slog.F("stage", log.Stage))
default:
// If this overflows users could miss logs streaming. This can happen
@ -439,9 +440,7 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan database
default:
}
logger.Debug(ctx, "got End of Logs")
close(closed)
close(bufferedLogs)
bufferedLogs <- nil
logMut.Unlock()
}
},
@ -449,5 +448,9 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan database
if err != nil {
return nil, nil, err
}
return bufferedLogs, closeSubscribe, nil
return bufferedLogs, func() {
closeSubscribe()
close(closed)
close(bufferedLogs)
}, nil
}