fix(agent): keep track of lastReportIndex between invocations of reportLifecycle() (#13075)

This commit is contained in:
Cian Johnston 2024-04-25 16:54:51 +01:00 committed by GitHub
parent c24b562199
commit 99dda4a43a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 12 additions and 12 deletions

View File

@ -240,10 +240,11 @@ type agent struct {
sshServer *agentssh.Server sshServer *agentssh.Server
sshMaxTimeout time.Duration sshMaxTimeout time.Duration
lifecycleUpdate chan struct{} lifecycleUpdate chan struct{}
lifecycleReported chan codersdk.WorkspaceAgentLifecycle lifecycleReported chan codersdk.WorkspaceAgentLifecycle
lifecycleMu sync.RWMutex // Protects following. lifecycleMu sync.RWMutex // Protects following.
lifecycleStates []agentsdk.PostLifecycleRequest lifecycleStates []agentsdk.PostLifecycleRequest
lifecycleLastReportedIndex int // Keeps track of the last lifecycle state we successfully reported.
network *tailnet.Conn network *tailnet.Conn
addresses []netip.Prefix addresses []netip.Prefix
@ -625,7 +626,6 @@ func (a *agent) reportMetadata(ctx context.Context, conn drpc.Conn) error {
// changes are reported in order. // changes are reported in order.
func (a *agent) reportLifecycle(ctx context.Context, conn drpc.Conn) error { func (a *agent) reportLifecycle(ctx context.Context, conn drpc.Conn) error {
aAPI := proto.NewDRPCAgentClient(conn) aAPI := proto.NewDRPCAgentClient(conn)
lastReportedIndex := 0 // Start off with the created state without reporting it.
for { for {
select { select {
case <-a.lifecycleUpdate: case <-a.lifecycleUpdate:
@ -636,20 +636,20 @@ func (a *agent) reportLifecycle(ctx context.Context, conn drpc.Conn) error {
for { for {
a.lifecycleMu.RLock() a.lifecycleMu.RLock()
lastIndex := len(a.lifecycleStates) - 1 lastIndex := len(a.lifecycleStates) - 1
report := a.lifecycleStates[lastReportedIndex] report := a.lifecycleStates[a.lifecycleLastReportedIndex]
if len(a.lifecycleStates) > lastReportedIndex+1 { if len(a.lifecycleStates) > a.lifecycleLastReportedIndex+1 {
report = a.lifecycleStates[lastReportedIndex+1] report = a.lifecycleStates[a.lifecycleLastReportedIndex+1]
} }
a.lifecycleMu.RUnlock() a.lifecycleMu.RUnlock()
if lastIndex == lastReportedIndex { if lastIndex == a.lifecycleLastReportedIndex {
break break
} }
l, err := agentsdk.ProtoFromLifecycle(report) l, err := agentsdk.ProtoFromLifecycle(report)
if err != nil { if err != nil {
a.logger.Critical(ctx, "failed to convert lifecycle state", slog.F("report", report)) a.logger.Critical(ctx, "failed to convert lifecycle state", slog.F("report", report))
// Skip this report; there is no point retrying. Maybe we can successfully convert the next one? // Skip this report; there is no point retrying. Maybe we can successfully convert the next one?
lastReportedIndex++ a.lifecycleLastReportedIndex++
continue continue
} }
payload := &proto.UpdateLifecycleRequest{Lifecycle: l} payload := &proto.UpdateLifecycleRequest{Lifecycle: l}
@ -662,13 +662,13 @@ func (a *agent) reportLifecycle(ctx context.Context, conn drpc.Conn) error {
} }
logger.Debug(ctx, "successfully reported lifecycle state") logger.Debug(ctx, "successfully reported lifecycle state")
lastReportedIndex++ a.lifecycleLastReportedIndex++
select { select {
case a.lifecycleReported <- report.State: case a.lifecycleReported <- report.State:
case <-a.lifecycleReported: case <-a.lifecycleReported:
a.lifecycleReported <- report.State a.lifecycleReported <- report.State
} }
if lastReportedIndex < lastIndex { if a.lifecycleLastReportedIndex < lastIndex {
// Keep reporting until we've sent all messages, we can't // Keep reporting until we've sent all messages, we can't
// rely on the channel triggering us before the backlog is // rely on the channel triggering us before the backlog is
// consumed. // consumed.