feat: add WaitUntilEmpty to LogSender (#12159)

We'll need this to be able to tell when all outstanding logs have been sent, as part of graceful shutdown.
This commit is contained in:
Spike Curtis 2024-02-20 11:11:31 +04:00 committed by GitHub
parent 081e37d7d9
commit ab4cb66e00
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 77 additions and 1 deletions

View File

@ -437,6 +437,7 @@ func (l *LogSender) SendLoop(ctx context.Context, dest logDest) error {
l.exceededLogLimit = true
// no point in keeping anything we have queued around, server will not accept them
l.queues = make(map[uuid.UUID]*logQueue)
l.Broadcast() // might unblock WaitUntilEmpty
return LogLimitExceededError
}
@ -451,6 +452,7 @@ func (l *LogSender) SendLoop(ctx context.Context, dest logDest) error {
if len(q.logs) == 0 {
// no empty queues
delete(l.queues, src)
l.Broadcast() // might unblock WaitUntilEmpty
continue
}
q.lastFlush = time.Now()
@ -487,6 +489,34 @@ func (l *LogSender) GetScriptLogger(logSourceID uuid.UUID) ScriptLogger {
return ScriptLogger{srcID: logSourceID, sender: l}
}
// WaitUntilEmpty waits until the LogSender's queues are empty or the given context expires.
func (l *LogSender) WaitUntilEmpty(ctx context.Context) error {
ctxDone := false
nevermind := make(chan struct{})
defer close(nevermind)
go func() {
select {
case <-ctx.Done():
l.L.Lock()
defer l.L.Unlock()
ctxDone = true
l.Broadcast()
return
case <-nevermind:
return
}
}()
l.L.Lock()
defer l.L.Unlock()
for len(l.queues) != 0 && !ctxDone {
l.Wait()
}
if len(l.queues) == 0 {
return nil
}
return ctx.Err()
}
type ScriptLogger struct {
sender *LogSender
srcID uuid.UUID

View File

@ -56,6 +56,12 @@ func TestLogSender_Mainline(t *testing.T) {
loopErr <- err
}()
empty := make(chan error, 1)
go func() {
err := uut.WaitUntilEmpty(ctx)
empty <- err
}()
// since neither source has even been flushed, it should immediately Flush
// both, although the order is not controlled
var logReqs []*proto.BatchCreateLogsRequest
@ -104,8 +110,11 @@ func TestLogSender_Mainline(t *testing.T) {
require.Equal(t, proto.Log_DEBUG, req.Logs[0].GetLevel())
require.Equal(t, t1, req.Logs[0].GetCreatedAt().AsTime())
err := testutil.RequireRecvCtx(ctx, t, empty)
require.NoError(t, err)
cancel()
err := testutil.RequireRecvCtx(testCtx, t, loopErr)
err = testutil.RequireRecvCtx(testCtx, t, loopErr)
require.ErrorIs(t, err, context.Canceled)
// we can still enqueue more logs after SendLoop returns
@ -132,6 +141,12 @@ func TestLogSender_LogLimitExceeded(t *testing.T) {
Level: codersdk.LogLevelInfo,
})
empty := make(chan error, 1)
go func() {
err := uut.WaitUntilEmpty(ctx)
empty <- err
}()
loopErr := make(chan error, 1)
go func() {
err := uut.SendLoop(ctx, fDest)
@ -146,6 +161,10 @@ func TestLogSender_LogLimitExceeded(t *testing.T) {
err := testutil.RequireRecvCtx(ctx, t, loopErr)
require.ErrorIs(t, err, LogLimitExceededError)
// Should also unblock WaitUntilEmpty
err = testutil.RequireRecvCtx(ctx, t, empty)
require.NoError(t, err)
// we can still enqueue more logs after SendLoop returns, but they don't
// actually get enqueued
uut.Enqueue(ls1, Log{
@ -363,6 +382,33 @@ func TestLogSender_SendError(t *testing.T) {
uut.L.Unlock()
}
func TestLogSender_WaitUntilEmpty_ContextExpired(t *testing.T) {
t.Parallel()
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
uut := NewLogSender(logger)
t0 := dbtime.Now()
ls1 := uuid.UUID{0x11}
uut.Enqueue(ls1, Log{
CreatedAt: t0,
Output: "test log 0, src 1",
Level: codersdk.LogLevelInfo,
})
empty := make(chan error, 1)
go func() {
err := uut.WaitUntilEmpty(ctx)
empty <- err
}()
cancel()
err := testutil.RequireRecvCtx(testCtx, t, empty)
require.ErrorIs(t, err, context.Canceled)
}
type fakeLogDest struct {
reqs chan *proto.BatchCreateLogsRequest
resps chan *proto.BatchCreateLogsResponse