mirror of https://github.com/coder/coder.git
feat: add logSender for sending logs on agent v2 API (#12046)
Adds a new subcomponent of the agent for queueing up logs until they can be sent over the Agent API. Subsequent PR will change the agent to use this instead of the HTTP API for posting logs. Relates to #10534
This commit is contained in:
parent
627232eae9
commit
2aff014e5d
|
@ -0,0 +1,239 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"cdr.dev/slog"
|
||||
"github.com/coder/coder/v2/agent/proto"
|
||||
"github.com/coder/coder/v2/codersdk/agentsdk"
|
||||
)
|
||||
|
||||
const (
|
||||
flushInterval = time.Second
|
||||
maxBytesPerBatch = 1 << 20 // 1MiB
|
||||
overheadPerLog = 21 // found by testing
|
||||
|
||||
// maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken
|
||||
// from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll
|
||||
// accept in the database.
|
||||
maxBytesQueued = 1048576
|
||||
)
|
||||
|
||||
type logQueue struct {
|
||||
logs []*proto.Log
|
||||
flushRequested bool
|
||||
lastFlush time.Time
|
||||
}
|
||||
|
||||
// logSender is a subcomponent of agent that handles enqueuing logs and then sending them over the
|
||||
// agent API. Things that need to log call enqueue and flush. When the agent API becomes available,
|
||||
// the agent calls sendLoop to send pending logs.
|
||||
type logSender struct {
|
||||
*sync.Cond
|
||||
queues map[uuid.UUID]*logQueue
|
||||
logger slog.Logger
|
||||
exceededLogLimit bool
|
||||
outputLen int
|
||||
}
|
||||
|
||||
type logDest interface {
|
||||
BatchCreateLogs(ctx context.Context, request *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error)
|
||||
}
|
||||
|
||||
func newLogSender(logger slog.Logger) *logSender {
|
||||
return &logSender{
|
||||
Cond: sync.NewCond(&sync.Mutex{}),
|
||||
logger: logger,
|
||||
queues: make(map[uuid.UUID]*logQueue),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) {
|
||||
logger := l.logger.With(slog.F("log_source_id", src))
|
||||
if len(logs) == 0 {
|
||||
logger.Debug(context.Background(), "enqueue called with no logs")
|
||||
return
|
||||
}
|
||||
l.L.Lock()
|
||||
defer l.L.Unlock()
|
||||
if l.exceededLogLimit {
|
||||
logger.Warn(context.Background(), "dropping enqueued logs because we have reached the server limit")
|
||||
// don't error, as we also write to file and don't want the overall write to fail
|
||||
return
|
||||
}
|
||||
defer l.Broadcast()
|
||||
q, ok := l.queues[src]
|
||||
if !ok {
|
||||
q = &logQueue{}
|
||||
l.queues[src] = q
|
||||
}
|
||||
for k, log := range logs {
|
||||
// Here we check the queue size before adding a log because we want to queue up slightly
|
||||
// more logs than the database would store to ensure we trigger "logs truncated" at the
|
||||
// database layer. Otherwise, the end user wouldn't know logs are truncated unless they
|
||||
// examined the Coder agent logs.
|
||||
if l.outputLen > maxBytesQueued {
|
||||
logger.Warn(context.Background(), "log queue full; truncating new logs", slog.F("new_logs", k), slog.F("queued_logs", len(q.logs)))
|
||||
return
|
||||
}
|
||||
pl, err := agentsdk.ProtoFromLog(log)
|
||||
if err != nil {
|
||||
logger.Critical(context.Background(), "failed to convert log", slog.Error(err))
|
||||
return
|
||||
}
|
||||
if len(pl.Output)+overheadPerLog > maxBytesPerBatch {
|
||||
logger.Warn(context.Background(), "dropping log line that exceeds our limit", slog.F("len", len(pl.Output)))
|
||||
continue
|
||||
}
|
||||
q.logs = append(q.logs, pl)
|
||||
l.outputLen += len(pl.Output)
|
||||
}
|
||||
logger.Debug(context.Background(), "enqueued agent logs", slog.F("new_logs", len(logs)), slog.F("queued_logs", len(q.logs)))
|
||||
}
|
||||
|
||||
func (l *logSender) flush(src uuid.UUID) {
|
||||
l.L.Lock()
|
||||
defer l.L.Unlock()
|
||||
defer l.Broadcast()
|
||||
q, ok := l.queues[src]
|
||||
if ok {
|
||||
q.flushRequested = true
|
||||
}
|
||||
// queue might not exist because it's already been flushed and removed from
|
||||
// the map.
|
||||
}
|
||||
|
||||
// sendLoop sends any pending logs until it hits an error or the context is canceled. It does not
|
||||
// retry as it is expected that a higher layer retries establishing connection to the agent API and
|
||||
// calls sendLoop again.
|
||||
func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
|
||||
l.L.Lock()
|
||||
defer l.L.Unlock()
|
||||
if l.exceededLogLimit {
|
||||
l.logger.Debug(ctx, "aborting sendLoop because log limit is already exceeded")
|
||||
// no point in keeping this loop going, if log limit is exceeded, but don't return an
|
||||
// error because we're already handled it
|
||||
return nil
|
||||
}
|
||||
|
||||
ctxDone := false
|
||||
defer l.logger.Debug(ctx, "sendLoop exiting")
|
||||
|
||||
// wake 4 times per flush interval to check if anything needs to be flushed
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
go func() {
|
||||
tkr := time.NewTicker(flushInterval / 4)
|
||||
defer tkr.Stop()
|
||||
for {
|
||||
select {
|
||||
// also monitor the context here, so we notice immediately, rather
|
||||
// than waiting for the next tick or logs
|
||||
case <-ctx.Done():
|
||||
l.L.Lock()
|
||||
ctxDone = true
|
||||
l.L.Unlock()
|
||||
l.Broadcast()
|
||||
return
|
||||
case <-tkr.C:
|
||||
l.Broadcast()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
for !ctxDone && !l.hasPendingWorkLocked() {
|
||||
l.Wait()
|
||||
}
|
||||
if ctxDone {
|
||||
return nil
|
||||
}
|
||||
|
||||
src, q := l.getPendingWorkLocked()
|
||||
logger := l.logger.With(slog.F("log_source_id", src))
|
||||
q.flushRequested = false // clear flag since we're now flushing
|
||||
req := &proto.BatchCreateLogsRequest{
|
||||
LogSourceId: src[:],
|
||||
}
|
||||
|
||||
// outputToSend keeps track of the size of the protobuf message we send, while
|
||||
// outputToRemove keeps track of the size of the output we'll remove from the queues on
|
||||
// success. They are different because outputToSend also counts protocol message overheads.
|
||||
outputToSend := 0
|
||||
outputToRemove := 0
|
||||
n := 0
|
||||
for n < len(q.logs) {
|
||||
log := q.logs[n]
|
||||
outputToSend += len(log.Output) + overheadPerLog
|
||||
if outputToSend > maxBytesPerBatch {
|
||||
break
|
||||
}
|
||||
req.Logs = append(req.Logs, log)
|
||||
n++
|
||||
outputToRemove += len(log.Output)
|
||||
}
|
||||
|
||||
l.L.Unlock()
|
||||
logger.Debug(ctx, "sending logs to agent API", slog.F("num_logs", len(req.Logs)))
|
||||
resp, err := dest.BatchCreateLogs(ctx, req)
|
||||
l.L.Lock()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to upload logs: %w", err)
|
||||
}
|
||||
if resp.LogLimitExceeded {
|
||||
l.logger.Warn(ctx, "server log limit exceeded; logs truncated")
|
||||
l.exceededLogLimit = true
|
||||
// no point in keeping anything we have queued around, server will not accept them
|
||||
l.queues = make(map[uuid.UUID]*logQueue)
|
||||
// We've handled the error as best as we can. We don't want the server limit to grind
|
||||
// other things to a halt, so this is all we can do.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Since elsewhere we only append to the logs, here we can remove them
|
||||
// since we successfully sent them. First we nil the pointers though,
|
||||
// so that they can be gc'd.
|
||||
for i := 0; i < n; i++ {
|
||||
q.logs[i] = nil
|
||||
}
|
||||
q.logs = q.logs[n:]
|
||||
l.outputLen -= outputToRemove
|
||||
if len(q.logs) == 0 {
|
||||
// no empty queues
|
||||
delete(l.queues, src)
|
||||
continue
|
||||
}
|
||||
q.lastFlush = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
func (l *logSender) hasPendingWorkLocked() bool {
|
||||
for _, q := range l.queues {
|
||||
if time.Since(q.lastFlush) > flushInterval {
|
||||
return true
|
||||
}
|
||||
if q.flushRequested {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (l *logSender) getPendingWorkLocked() (src uuid.UUID, q *logQueue) {
|
||||
// take the one it's been the longest since we've flushed, so that we have some sense of
|
||||
// fairness across sources
|
||||
var earliestFlush time.Time
|
||||
for is, iq := range l.queues {
|
||||
if q == nil || iq.lastFlush.Before(earliestFlush) {
|
||||
src = is
|
||||
q = iq
|
||||
earliestFlush = iq.lastFlush
|
||||
}
|
||||
}
|
||||
return src, q
|
||||
}
|
|
@ -0,0 +1,400 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"golang.org/x/exp/slices"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
protobuf "google.golang.org/protobuf/proto"
|
||||
|
||||
"cdr.dev/slog"
|
||||
"cdr.dev/slog/sloggers/slogtest"
|
||||
"github.com/coder/coder/v2/agent/proto"
|
||||
"github.com/coder/coder/v2/coderd/database/dbtime"
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
"github.com/coder/coder/v2/codersdk/agentsdk"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
)
|
||||
|
||||
func TestLogSender_Mainline(t *testing.T) {
|
||||
t.Parallel()
|
||||
testCtx := testutil.Context(t, testutil.WaitShort)
|
||||
ctx, cancel := context.WithCancel(testCtx)
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
fDest := newFakeLogDest()
|
||||
uut := newLogSender(logger)
|
||||
|
||||
t0 := dbtime.Now()
|
||||
|
||||
ls1 := uuid.UUID{0x11}
|
||||
uut.enqueue(ls1, agentsdk.Log{
|
||||
CreatedAt: t0,
|
||||
Output: "test log 0, src 1",
|
||||
Level: codersdk.LogLevelInfo,
|
||||
})
|
||||
|
||||
ls2 := uuid.UUID{0x22}
|
||||
uut.enqueue(ls2,
|
||||
agentsdk.Log{
|
||||
CreatedAt: t0,
|
||||
Output: "test log 0, src 2",
|
||||
Level: codersdk.LogLevelError,
|
||||
},
|
||||
agentsdk.Log{
|
||||
CreatedAt: t0,
|
||||
Output: "test log 1, src 2",
|
||||
Level: codersdk.LogLevelWarn,
|
||||
},
|
||||
)
|
||||
|
||||
loopErr := make(chan error, 1)
|
||||
go func() {
|
||||
err := uut.sendLoop(ctx, fDest)
|
||||
loopErr <- err
|
||||
}()
|
||||
|
||||
// since neither source has even been flushed, it should immediately flush
|
||||
// both, although the order is not controlled
|
||||
var logReqs []*proto.BatchCreateLogsRequest
|
||||
logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs))
|
||||
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
|
||||
logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs))
|
||||
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
|
||||
for _, req := range logReqs {
|
||||
require.NotNil(t, req)
|
||||
srcID, err := uuid.FromBytes(req.LogSourceId)
|
||||
require.NoError(t, err)
|
||||
switch srcID {
|
||||
case ls1:
|
||||
require.Len(t, req.Logs, 1)
|
||||
require.Equal(t, "test log 0, src 1", req.Logs[0].GetOutput())
|
||||
require.Equal(t, proto.Log_INFO, req.Logs[0].GetLevel())
|
||||
require.Equal(t, t0, req.Logs[0].GetCreatedAt().AsTime())
|
||||
case ls2:
|
||||
require.Len(t, req.Logs, 2)
|
||||
require.Equal(t, "test log 0, src 2", req.Logs[0].GetOutput())
|
||||
require.Equal(t, proto.Log_ERROR, req.Logs[0].GetLevel())
|
||||
require.Equal(t, t0, req.Logs[0].GetCreatedAt().AsTime())
|
||||
require.Equal(t, "test log 1, src 2", req.Logs[1].GetOutput())
|
||||
require.Equal(t, proto.Log_WARN, req.Logs[1].GetLevel())
|
||||
require.Equal(t, t0, req.Logs[1].GetCreatedAt().AsTime())
|
||||
default:
|
||||
t.Fatal("unknown log source")
|
||||
}
|
||||
}
|
||||
|
||||
t1 := dbtime.Now()
|
||||
uut.enqueue(ls1, agentsdk.Log{
|
||||
CreatedAt: t1,
|
||||
Output: "test log 1, src 1",
|
||||
Level: codersdk.LogLevelDebug,
|
||||
})
|
||||
uut.flush(ls1)
|
||||
|
||||
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
|
||||
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
|
||||
// give ourselves a 25% buffer if we're right on the cusp of a tick
|
||||
require.LessOrEqual(t, time.Since(t1), flushInterval*5/4)
|
||||
require.NotNil(t, req)
|
||||
require.Len(t, req.Logs, 1)
|
||||
require.Equal(t, "test log 1, src 1", req.Logs[0].GetOutput())
|
||||
require.Equal(t, proto.Log_DEBUG, req.Logs[0].GetLevel())
|
||||
require.Equal(t, t1, req.Logs[0].GetCreatedAt().AsTime())
|
||||
|
||||
cancel()
|
||||
err := testutil.RequireRecvCtx(testCtx, t, loopErr)
|
||||
require.NoError(t, err)
|
||||
|
||||
// we can still enqueue more logs after sendLoop returns
|
||||
uut.enqueue(ls1, agentsdk.Log{
|
||||
CreatedAt: t1,
|
||||
Output: "test log 2, src 1",
|
||||
Level: codersdk.LogLevelTrace,
|
||||
})
|
||||
}
|
||||
|
||||
func TestLogSender_LogLimitExceeded(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
fDest := newFakeLogDest()
|
||||
uut := newLogSender(logger)
|
||||
|
||||
t0 := dbtime.Now()
|
||||
|
||||
ls1 := uuid.UUID{0x11}
|
||||
uut.enqueue(ls1, agentsdk.Log{
|
||||
CreatedAt: t0,
|
||||
Output: "test log 0, src 1",
|
||||
Level: codersdk.LogLevelInfo,
|
||||
})
|
||||
|
||||
loopErr := make(chan error, 1)
|
||||
go func() {
|
||||
err := uut.sendLoop(ctx, fDest)
|
||||
loopErr <- err
|
||||
}()
|
||||
|
||||
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
|
||||
require.NotNil(t, req)
|
||||
testutil.RequireSendCtx(ctx, t, fDest.resps,
|
||||
&proto.BatchCreateLogsResponse{LogLimitExceeded: true})
|
||||
|
||||
err := testutil.RequireRecvCtx(ctx, t, loopErr)
|
||||
require.NoError(t, err)
|
||||
|
||||
// we can still enqueue more logs after sendLoop returns, but they don't
|
||||
// actually get enqueued
|
||||
uut.enqueue(ls1, agentsdk.Log{
|
||||
CreatedAt: t0,
|
||||
Output: "test log 2, src 1",
|
||||
Level: codersdk.LogLevelTrace,
|
||||
})
|
||||
uut.L.Lock()
|
||||
require.Len(t, uut.queues, 0)
|
||||
uut.L.Unlock()
|
||||
|
||||
// Also, if we run sendLoop again, it should immediately exit.
|
||||
go func() {
|
||||
err := uut.sendLoop(ctx, fDest)
|
||||
loopErr <- err
|
||||
}()
|
||||
err = testutil.RequireRecvCtx(ctx, t, loopErr)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestLogSender_SkipHugeLog(t *testing.T) {
|
||||
t.Parallel()
|
||||
testCtx := testutil.Context(t, testutil.WaitShort)
|
||||
ctx, cancel := context.WithCancel(testCtx)
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
fDest := newFakeLogDest()
|
||||
uut := newLogSender(logger)
|
||||
|
||||
t0 := dbtime.Now()
|
||||
ls1 := uuid.UUID{0x11}
|
||||
// since we add some overhead to the actual length of the output, a log just
|
||||
// under the perBatch limit will not be accepted.
|
||||
hugeLog := make([]byte, maxBytesPerBatch-1)
|
||||
for i := range hugeLog {
|
||||
hugeLog[i] = 'q'
|
||||
}
|
||||
uut.enqueue(ls1,
|
||||
agentsdk.Log{
|
||||
CreatedAt: t0,
|
||||
Output: string(hugeLog),
|
||||
Level: codersdk.LogLevelInfo,
|
||||
},
|
||||
agentsdk.Log{
|
||||
CreatedAt: t0,
|
||||
Output: "test log 1, src 1",
|
||||
Level: codersdk.LogLevelInfo,
|
||||
})
|
||||
|
||||
loopErr := make(chan error, 1)
|
||||
go func() {
|
||||
err := uut.sendLoop(ctx, fDest)
|
||||
loopErr <- err
|
||||
}()
|
||||
|
||||
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
|
||||
require.NotNil(t, req)
|
||||
require.Len(t, req.Logs, 1, "it should skip the huge log")
|
||||
require.Equal(t, "test log 1, src 1", req.Logs[0].GetOutput())
|
||||
require.Equal(t, proto.Log_INFO, req.Logs[0].GetLevel())
|
||||
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
|
||||
|
||||
cancel()
|
||||
err := testutil.RequireRecvCtx(testCtx, t, loopErr)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestLogSender_Batch(t *testing.T) {
|
||||
t.Parallel()
|
||||
testCtx := testutil.Context(t, testutil.WaitShort)
|
||||
ctx, cancel := context.WithCancel(testCtx)
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
fDest := newFakeLogDest()
|
||||
uut := newLogSender(logger)
|
||||
|
||||
t0 := dbtime.Now()
|
||||
ls1 := uuid.UUID{0x11}
|
||||
var logs []agentsdk.Log
|
||||
for i := 0; i < 60000; i++ {
|
||||
logs = append(logs, agentsdk.Log{
|
||||
CreatedAt: t0,
|
||||
Output: "r",
|
||||
Level: codersdk.LogLevelInfo,
|
||||
})
|
||||
}
|
||||
uut.enqueue(ls1, logs...)
|
||||
|
||||
loopErr := make(chan error, 1)
|
||||
go func() {
|
||||
err := uut.sendLoop(ctx, fDest)
|
||||
loopErr <- err
|
||||
}()
|
||||
|
||||
// with 60k logs, we should split into two updates to avoid going over 1MiB, since each log
|
||||
// is about 21 bytes.
|
||||
gotLogs := 0
|
||||
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
|
||||
require.NotNil(t, req)
|
||||
gotLogs += len(req.Logs)
|
||||
wire, err := protobuf.Marshal(req)
|
||||
require.NoError(t, err)
|
||||
require.Less(t, len(wire), maxBytesPerBatch, "wire should not exceed 1MiB")
|
||||
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
|
||||
req = testutil.RequireRecvCtx(ctx, t, fDest.reqs)
|
||||
require.NotNil(t, req)
|
||||
gotLogs += len(req.Logs)
|
||||
wire, err = protobuf.Marshal(req)
|
||||
require.NoError(t, err)
|
||||
require.Less(t, len(wire), maxBytesPerBatch, "wire should not exceed 1MiB")
|
||||
require.Equal(t, 60000, gotLogs)
|
||||
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
|
||||
|
||||
cancel()
|
||||
err = testutil.RequireRecvCtx(testCtx, t, loopErr)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestLogSender_MaxQueuedLogs(t *testing.T) {
|
||||
t.Parallel()
|
||||
testCtx := testutil.Context(t, testutil.WaitShort)
|
||||
ctx, cancel := context.WithCancel(testCtx)
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
fDest := newFakeLogDest()
|
||||
uut := newLogSender(logger)
|
||||
|
||||
t0 := dbtime.Now()
|
||||
ls1 := uuid.UUID{0x11}
|
||||
n := 4
|
||||
hugeLog := make([]byte, maxBytesQueued/n)
|
||||
for i := range hugeLog {
|
||||
hugeLog[i] = 'q'
|
||||
}
|
||||
var logs []agentsdk.Log
|
||||
for i := 0; i < n; i++ {
|
||||
logs = append(logs, agentsdk.Log{
|
||||
CreatedAt: t0,
|
||||
Output: string(hugeLog),
|
||||
Level: codersdk.LogLevelInfo,
|
||||
})
|
||||
}
|
||||
uut.enqueue(ls1, logs...)
|
||||
|
||||
// we're now right at the limit of output
|
||||
require.Equal(t, maxBytesQueued, uut.outputLen)
|
||||
|
||||
// adding more logs should not error...
|
||||
ls2 := uuid.UUID{0x22}
|
||||
uut.enqueue(ls2, logs...)
|
||||
|
||||
loopErr := make(chan error, 1)
|
||||
go func() {
|
||||
err := uut.sendLoop(ctx, fDest)
|
||||
loopErr <- err
|
||||
}()
|
||||
|
||||
// It should still queue up one log from source #2, so that we would exceed the database
|
||||
// limit. These come over a total of 3 updates, because due to overhead, the n logs from source
|
||||
// #1 come in 2 updates, plus 1 update for source #2.
|
||||
logsBySource := make(map[uuid.UUID]int)
|
||||
for i := 0; i < 3; i++ {
|
||||
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
|
||||
require.NotNil(t, req)
|
||||
srcID, err := uuid.FromBytes(req.LogSourceId)
|
||||
require.NoError(t, err)
|
||||
logsBySource[srcID] += len(req.Logs)
|
||||
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
|
||||
}
|
||||
require.Equal(t, map[uuid.UUID]int{
|
||||
ls1: n,
|
||||
ls2: 1,
|
||||
}, logsBySource)
|
||||
|
||||
cancel()
|
||||
err := testutil.RequireRecvCtx(testCtx, t, loopErr)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestLogSender_SendError(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
fDest := newFakeLogDest()
|
||||
expectedErr := xerrors.New("test")
|
||||
fDest.err = expectedErr
|
||||
uut := newLogSender(logger)
|
||||
|
||||
t0 := dbtime.Now()
|
||||
|
||||
ls1 := uuid.UUID{0x11}
|
||||
uut.enqueue(ls1, agentsdk.Log{
|
||||
CreatedAt: t0,
|
||||
Output: "test log 0, src 1",
|
||||
Level: codersdk.LogLevelInfo,
|
||||
})
|
||||
|
||||
loopErr := make(chan error, 1)
|
||||
go func() {
|
||||
err := uut.sendLoop(ctx, fDest)
|
||||
loopErr <- err
|
||||
}()
|
||||
|
||||
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
|
||||
require.NotNil(t, req)
|
||||
|
||||
err := testutil.RequireRecvCtx(ctx, t, loopErr)
|
||||
require.ErrorIs(t, err, expectedErr)
|
||||
|
||||
// we can still enqueue more logs after sendLoop returns
|
||||
uut.enqueue(ls1, agentsdk.Log{
|
||||
CreatedAt: t0,
|
||||
Output: "test log 2, src 1",
|
||||
Level: codersdk.LogLevelTrace,
|
||||
})
|
||||
uut.L.Lock()
|
||||
require.Len(t, uut.queues, 1)
|
||||
uut.L.Unlock()
|
||||
}
|
||||
|
||||
type fakeLogDest struct {
|
||||
reqs chan *proto.BatchCreateLogsRequest
|
||||
resps chan *proto.BatchCreateLogsResponse
|
||||
err error
|
||||
}
|
||||
|
||||
func (f fakeLogDest) BatchCreateLogs(ctx context.Context, req *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error) {
|
||||
// clone the logs so that modifications the sender makes don't affect our tests. In production
|
||||
// these would be serialized/deserialized so we don't have to worry too much.
|
||||
req.Logs = slices.Clone(req.Logs)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case f.reqs <- req:
|
||||
if f.err != nil {
|
||||
return nil, f.err
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case resp := <-f.resps:
|
||||
return resp, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newFakeLogDest() *fakeLogDest {
|
||||
return &fakeLogDest{
|
||||
reqs: make(chan *proto.BatchCreateLogsRequest),
|
||||
resps: make(chan *proto.BatchCreateLogsResponse),
|
||||
}
|
||||
}
|
|
@ -7,6 +7,7 @@ import (
|
|||
"github.com/google/uuid"
|
||||
"golang.org/x/xerrors"
|
||||
"google.golang.org/protobuf/types/known/durationpb"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
"github.com/coder/coder/v2/agent/proto"
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
|
@ -298,3 +299,15 @@ func ProtoFromAppHealthsRequest(req PostAppHealthsRequest) (*proto.BatchUpdateAp
|
|||
}
|
||||
return pReq, nil
|
||||
}
|
||||
|
||||
func ProtoFromLog(log Log) (*proto.Log, error) {
|
||||
lvl, ok := proto.Log_Level_value[strings.ToUpper(string(log.Level))]
|
||||
if !ok {
|
||||
return nil, xerrors.Errorf("unknown log level: %s", log.Level)
|
||||
}
|
||||
return &proto.Log{
|
||||
CreatedAt: timestamppb.New(log.CreatedAt),
|
||||
Output: log.Output,
|
||||
Level: proto.Log_Level(lvl),
|
||||
}, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue