coder/codersdk/agentsdk/logs_internal_test.go

444 lines
12 KiB
Go

package agentsdk
import (
"context"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
"golang.org/x/xerrors"
protobuf "google.golang.org/protobuf/proto"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/testutil"
)
func TestLogSender_Mainline(t *testing.T) {
t.Parallel()
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
fDest := newFakeLogDest()
uut := NewLogSender(logger)
t0 := dbtime.Now()
ls1 := uuid.UUID{0x11}
uut.Enqueue(ls1, Log{
CreatedAt: t0,
Output: "test log 0, src 1",
Level: codersdk.LogLevelInfo,
})
ls2 := uuid.UUID{0x22}
uut.Enqueue(ls2,
Log{
CreatedAt: t0,
Output: "test log 0, src 2",
Level: codersdk.LogLevelError,
},
Log{
CreatedAt: t0,
Output: "test log 1, src 2",
Level: codersdk.LogLevelWarn,
},
)
loopErr := make(chan error, 1)
go func() {
err := uut.SendLoop(ctx, fDest)
loopErr <- err
}()
empty := make(chan error, 1)
go func() {
err := uut.WaitUntilEmpty(ctx)
empty <- err
}()
// since neither source has even been flushed, it should immediately Flush
// both, although the order is not controlled
var logReqs []*proto.BatchCreateLogsRequest
logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs))
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs))
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
for _, req := range logReqs {
require.NotNil(t, req)
srcID, err := uuid.FromBytes(req.LogSourceId)
require.NoError(t, err)
switch srcID {
case ls1:
require.Len(t, req.Logs, 1)
require.Equal(t, "test log 0, src 1", req.Logs[0].GetOutput())
require.Equal(t, proto.Log_INFO, req.Logs[0].GetLevel())
require.Equal(t, t0, req.Logs[0].GetCreatedAt().AsTime())
case ls2:
require.Len(t, req.Logs, 2)
require.Equal(t, "test log 0, src 2", req.Logs[0].GetOutput())
require.Equal(t, proto.Log_ERROR, req.Logs[0].GetLevel())
require.Equal(t, t0, req.Logs[0].GetCreatedAt().AsTime())
require.Equal(t, "test log 1, src 2", req.Logs[1].GetOutput())
require.Equal(t, proto.Log_WARN, req.Logs[1].GetLevel())
require.Equal(t, t0, req.Logs[1].GetCreatedAt().AsTime())
default:
t.Fatal("unknown log source")
}
}
t1 := dbtime.Now()
uut.Enqueue(ls1, Log{
CreatedAt: t1,
Output: "test log 1, src 1",
Level: codersdk.LogLevelDebug,
})
uut.Flush(ls1)
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
// give ourselves a 25% buffer if we're right on the cusp of a tick
require.LessOrEqual(t, time.Since(t1), flushInterval*5/4)
require.NotNil(t, req)
require.Len(t, req.Logs, 1)
require.Equal(t, "test log 1, src 1", req.Logs[0].GetOutput())
require.Equal(t, proto.Log_DEBUG, req.Logs[0].GetLevel())
require.Equal(t, t1, req.Logs[0].GetCreatedAt().AsTime())
err := testutil.RequireRecvCtx(ctx, t, empty)
require.NoError(t, err)
cancel()
err = testutil.RequireRecvCtx(testCtx, t, loopErr)
require.ErrorIs(t, err, context.Canceled)
// we can still enqueue more logs after SendLoop returns
uut.Enqueue(ls1, Log{
CreatedAt: t1,
Output: "test log 2, src 1",
Level: codersdk.LogLevelTrace,
})
}
func TestLogSender_LogLimitExceeded(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
fDest := newFakeLogDest()
uut := NewLogSender(logger)
t0 := dbtime.Now()
ls1 := uuid.UUID{0x11}
uut.Enqueue(ls1, Log{
CreatedAt: t0,
Output: "test log 0, src 1",
Level: codersdk.LogLevelInfo,
})
empty := make(chan error, 1)
go func() {
err := uut.WaitUntilEmpty(ctx)
empty <- err
}()
loopErr := make(chan error, 1)
go func() {
err := uut.SendLoop(ctx, fDest)
loopErr <- err
}()
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
require.NotNil(t, req)
testutil.RequireSendCtx(ctx, t, fDest.resps,
&proto.BatchCreateLogsResponse{LogLimitExceeded: true})
err := testutil.RequireRecvCtx(ctx, t, loopErr)
require.ErrorIs(t, err, LogLimitExceededError)
// Should also unblock WaitUntilEmpty
err = testutil.RequireRecvCtx(ctx, t, empty)
require.NoError(t, err)
// we can still enqueue more logs after SendLoop returns, but they don't
// actually get enqueued
uut.Enqueue(ls1, Log{
CreatedAt: t0,
Output: "test log 2, src 1",
Level: codersdk.LogLevelTrace,
})
uut.L.Lock()
require.Len(t, uut.queues, 0)
uut.L.Unlock()
// Also, if we run SendLoop again, it should immediately exit.
go func() {
err := uut.SendLoop(ctx, fDest)
loopErr <- err
}()
err = testutil.RequireRecvCtx(ctx, t, loopErr)
require.ErrorIs(t, err, LogLimitExceededError)
}
func TestLogSender_SkipHugeLog(t *testing.T) {
t.Parallel()
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
fDest := newFakeLogDest()
uut := NewLogSender(logger)
t0 := dbtime.Now()
ls1 := uuid.UUID{0x11}
// since we add some overhead to the actual length of the output, a log just
// under the perBatch limit will not be accepted.
hugeLog := make([]byte, maxBytesPerBatch-1)
for i := range hugeLog {
hugeLog[i] = 'q'
}
uut.Enqueue(ls1,
Log{
CreatedAt: t0,
Output: string(hugeLog),
Level: codersdk.LogLevelInfo,
},
Log{
CreatedAt: t0,
Output: "test log 1, src 1",
Level: codersdk.LogLevelInfo,
})
loopErr := make(chan error, 1)
go func() {
err := uut.SendLoop(ctx, fDest)
loopErr <- err
}()
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
require.NotNil(t, req)
require.Len(t, req.Logs, 1, "it should skip the huge log")
require.Equal(t, "test log 1, src 1", req.Logs[0].GetOutput())
require.Equal(t, proto.Log_INFO, req.Logs[0].GetLevel())
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
cancel()
err := testutil.RequireRecvCtx(testCtx, t, loopErr)
require.ErrorIs(t, err, context.Canceled)
}
func TestLogSender_Batch(t *testing.T) {
t.Parallel()
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
fDest := newFakeLogDest()
uut := NewLogSender(logger)
t0 := dbtime.Now()
ls1 := uuid.UUID{0x11}
var logs []Log
for i := 0; i < 60000; i++ {
logs = append(logs, Log{
CreatedAt: t0,
Output: "r",
Level: codersdk.LogLevelInfo,
})
}
uut.Enqueue(ls1, logs...)
loopErr := make(chan error, 1)
go func() {
err := uut.SendLoop(ctx, fDest)
loopErr <- err
}()
// with 60k logs, we should split into two updates to avoid going over 1MiB, since each log
// is about 21 bytes.
gotLogs := 0
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
require.NotNil(t, req)
gotLogs += len(req.Logs)
wire, err := protobuf.Marshal(req)
require.NoError(t, err)
require.Less(t, len(wire), maxBytesPerBatch, "wire should not exceed 1MiB")
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
req = testutil.RequireRecvCtx(ctx, t, fDest.reqs)
require.NotNil(t, req)
gotLogs += len(req.Logs)
wire, err = protobuf.Marshal(req)
require.NoError(t, err)
require.Less(t, len(wire), maxBytesPerBatch, "wire should not exceed 1MiB")
require.Equal(t, 60000, gotLogs)
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
cancel()
err = testutil.RequireRecvCtx(testCtx, t, loopErr)
require.ErrorIs(t, err, context.Canceled)
}
func TestLogSender_MaxQueuedLogs(t *testing.T) {
t.Parallel()
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
fDest := newFakeLogDest()
uut := NewLogSender(logger)
t0 := dbtime.Now()
ls1 := uuid.UUID{0x11}
n := 4
hugeLog := make([]byte, maxBytesQueued/n)
for i := range hugeLog {
hugeLog[i] = 'q'
}
var logs []Log
for i := 0; i < n; i++ {
logs = append(logs, Log{
CreatedAt: t0,
Output: string(hugeLog),
Level: codersdk.LogLevelInfo,
})
}
uut.Enqueue(ls1, logs...)
// we're now right at the limit of output
require.Equal(t, maxBytesQueued, uut.outputLen)
// adding more logs should not error...
ls2 := uuid.UUID{0x22}
uut.Enqueue(ls2, logs...)
loopErr := make(chan error, 1)
go func() {
err := uut.SendLoop(ctx, fDest)
loopErr <- err
}()
// It should still queue up one log from source #2, so that we would exceed the database
// limit. These come over a total of 3 updates, because due to overhead, the n logs from source
// #1 come in 2 updates, plus 1 update for source #2.
logsBySource := make(map[uuid.UUID]int)
for i := 0; i < 3; i++ {
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
require.NotNil(t, req)
srcID, err := uuid.FromBytes(req.LogSourceId)
require.NoError(t, err)
logsBySource[srcID] += len(req.Logs)
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
}
require.Equal(t, map[uuid.UUID]int{
ls1: n,
ls2: 1,
}, logsBySource)
cancel()
err := testutil.RequireRecvCtx(testCtx, t, loopErr)
require.ErrorIs(t, err, context.Canceled)
}
func TestLogSender_SendError(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
fDest := newFakeLogDest()
expectedErr := xerrors.New("test")
fDest.err = expectedErr
uut := NewLogSender(logger)
t0 := dbtime.Now()
ls1 := uuid.UUID{0x11}
uut.Enqueue(ls1, Log{
CreatedAt: t0,
Output: "test log 0, src 1",
Level: codersdk.LogLevelInfo,
})
loopErr := make(chan error, 1)
go func() {
err := uut.SendLoop(ctx, fDest)
loopErr <- err
}()
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
require.NotNil(t, req)
err := testutil.RequireRecvCtx(ctx, t, loopErr)
require.ErrorIs(t, err, expectedErr)
// we can still enqueue more logs after SendLoop returns
uut.Enqueue(ls1, Log{
CreatedAt: t0,
Output: "test log 2, src 1",
Level: codersdk.LogLevelTrace,
})
uut.L.Lock()
require.Len(t, uut.queues, 1)
uut.L.Unlock()
}
func TestLogSender_WaitUntilEmpty_ContextExpired(t *testing.T) {
t.Parallel()
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
uut := NewLogSender(logger)
t0 := dbtime.Now()
ls1 := uuid.UUID{0x11}
uut.Enqueue(ls1, Log{
CreatedAt: t0,
Output: "test log 0, src 1",
Level: codersdk.LogLevelInfo,
})
empty := make(chan error, 1)
go func() {
err := uut.WaitUntilEmpty(ctx)
empty <- err
}()
cancel()
err := testutil.RequireRecvCtx(testCtx, t, empty)
require.ErrorIs(t, err, context.Canceled)
}
type fakeLogDest struct {
reqs chan *proto.BatchCreateLogsRequest
resps chan *proto.BatchCreateLogsResponse
err error
}
func (f fakeLogDest) BatchCreateLogs(ctx context.Context, req *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error) {
// clone the logs so that modifications the sender makes don't affect our tests. In production
// these would be serialized/deserialized so we don't have to worry too much.
req.Logs = slices.Clone(req.Logs)
select {
case <-ctx.Done():
return nil, ctx.Err()
case f.reqs <- req:
if f.err != nil {
return nil, f.err
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case resp := <-f.resps:
return resp, nil
}
}
}
func newFakeLogDest() *fakeLogDest {
return &fakeLogDest{
reqs: make(chan *proto.BatchCreateLogsRequest),
resps: make(chan *proto.BatchCreateLogsResponse),
}
}