fix(coderd): remove rate limits from agent metadata (#9308)

Include the full update message in the PubSub notification so that
we don't have to refresh metadata from the DB and can avoid rate
limiting.
This commit is contained in:
Ammar Bandukwala 2023-08-24 15:18:42 -05:00 committed by GitHub
parent 7f14b50dbe
commit 630ec55c48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 95 additions and 85 deletions

View File

@ -6,7 +6,6 @@ import (
"database/sql"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"net"
@ -23,10 +22,9 @@ import (
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"golang.org/x/exp/slices"
"golang.org/x/exp/maps"
"golang.org/x/mod/semver"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"golang.org/x/xerrors"
"nhooyr.io/websocket"
"tailscale.com/tailcfg"
@ -39,7 +37,6 @@ import (
"github.com/coder/coder/v2/coderd/httpmw"
"github.com/coder/coder/v2/coderd/rbac"
"github.com/coder/coder/v2/coderd/util/ptr"
"github.com/coder/coder/v2/coderd/util/slice"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/tailnet"
@ -1528,7 +1525,11 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque
key := chi.URLParam(r, "key")
const (
maxValueLen = 32 << 10
// maxValueLen is set to 2048 to stay under the 8000 byte Postgres
// NOTIFY limit. Since both value and error can be set, the real
// payload limit is 2 * 2048 * 4/3 <base64 expansion> = 5461 bytes + a few hundred bytes for JSON
// syntax, key names, and metadata.
maxValueLen = 2048
maxErrorLen = maxValueLen
)
@ -1571,7 +1572,13 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque
slog.F("value", ellipse(datum.Value, 16)),
)
err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), []byte(datum.Key))
datumJSON, err := json.Marshal(datum)
if err != nil {
httpapi.InternalServerError(rw, err)
return
}
err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), datumJSON)
if err != nil {
httpapi.InternalServerError(rw, err)
return
@ -1597,7 +1604,42 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
)
)
sendEvent, senderClosed, err := httpapi.ServerSentEventSender(rw, r)
// We avoid channel-based synchronization here to avoid backpressure problems.
var (
metadataMapMu sync.Mutex
metadataMap = make(map[string]database.WorkspaceAgentMetadatum)
// pendingChanges must only be mutated when metadataMapMu is held.
pendingChanges atomic.Bool
)
// Send metadata on updates, we must ensure subscription before sending
// initial metadata to guarantee that events in-between are not missed.
cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, byt []byte) {
var update database.UpdateWorkspaceAgentMetadataParams
err := json.Unmarshal(byt, &update)
if err != nil {
api.Logger.Error(ctx, "failed to unmarshal pubsub message", slog.Error(err))
return
}
log.Debug(ctx, "received metadata update", "key", update.Key)
metadataMapMu.Lock()
defer metadataMapMu.Unlock()
md := metadataMap[update.Key]
md.Value = update.Value
md.Error = update.Error
md.CollectedAt = update.CollectedAt
metadataMap[update.Key] = md
pendingChanges.Store(true)
})
if err != nil {
httpapi.InternalServerError(rw, err)
return
}
defer cancelSub()
sseSendEvent, sseSenderClosed, err := httpapi.ServerSentEventSender(rw, r)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error setting up server-sent events.",
@ -1607,97 +1649,61 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
}
// Prevent handler from returning until the sender is closed.
defer func() {
<-senderClosed
<-sseSenderClosed
}()
const refreshInterval = time.Second * 5
refreshTicker := time.NewTicker(refreshInterval)
defer refreshTicker.Stop()
// We send updates exactly every second.
const sendInterval = time.Second * 1
sendTicker := time.NewTicker(sendInterval)
defer sendTicker.Stop()
var (
lastDBMetaMu sync.Mutex
lastDBMeta []database.WorkspaceAgentMetadatum
)
sendMetadata := func(pull bool) {
log.Debug(ctx, "sending metadata update", "pull", pull)
lastDBMetaMu.Lock()
defer lastDBMetaMu.Unlock()
var err error
if pull {
// We always use the original Request context because it contains
// the RBAC actor.
lastDBMeta, err = api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID)
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error getting metadata.",
Detail: err.Error(),
},
})
return
}
slices.SortFunc(lastDBMeta, func(a, b database.WorkspaceAgentMetadatum) int {
return slice.Ascending(a.Key, b.Key)
})
// Avoid sending refresh if the client is about to get a
// fresh update.
refreshTicker.Reset(refreshInterval)
}
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeData,
Data: convertWorkspaceAgentMetadata(lastDBMeta),
})
}
// Note: we previously used a debounce here, but when the rate of metadata updates was too
// high the debounce would never fire.
//
// The rate-limit has its own caveat. If the agent sends a burst of metadata
// but then goes quiet, we will never pull the new metadata and the frontend
// will go stale until refresh. This would only happen if the agent was
// under extreme load. Under normal operations, the interval between metadata
// updates is constant so there is no burst phenomenon.
pubsubRatelimit := rate.NewLimiter(rate.Every(time.Second), 2)
if flag.Lookup("test.v") != nil {
// We essentially disable the rate-limit in tests for determinism.
pubsubRatelimit = rate.NewLimiter(rate.Every(time.Second*100), 100)
}
// Send metadata on updates, we must ensure subscription before sending
// initial metadata to guarantee that events in-between are not missed.
cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, _ []byte) {
allow := pubsubRatelimit.Allow()
log.Debug(ctx, "received metadata update", "allow", allow)
if allow {
sendMetadata(true)
}
})
// We always use the original Request context because it contains
// the RBAC actor.
md, err := api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID)
if err != nil {
// If we can't successfully pull the initial metadata, pubsub
// updates will be no-op so we may as well terminate the
// connection early.
httpapi.InternalServerError(rw, err)
return
}
defer cancelSub()
metadataMapMu.Lock()
for _, datum := range md {
metadataMap[datum.Key] = datum
}
metadataMapMu.Unlock()
// Send initial metadata.
sendMetadata(true)
var lastSend time.Time
sendMetadata := func() {
metadataMapMu.Lock()
values := maps.Values(metadataMap)
pendingChanges.Store(false)
metadataMapMu.Unlock()
lastSend = time.Now()
_ = sseSendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeData,
Data: convertWorkspaceAgentMetadata(values),
})
}
sendMetadata()
for {
select {
case <-senderClosed:
case <-sendTicker.C:
// We send an update even if there's no change every 5 seconds
// to ensure that the frontend always has an accurate "Result.Age".
if !pendingChanges.Load() && time.Since(lastSend) < time.Second*5 {
continue
}
sendMetadata()
case <-sseSenderClosed:
return
case <-refreshTicker.C:
}
// Avoid spamming the DB with reads we know there are no updates. We want
// to continue sending updates to the frontend so that "Result.Age"
// is always accurate. This way, the frontend doesn't need to
// sync its own clock with the backend.
sendMetadata(false)
}
}
@ -1721,6 +1727,10 @@ func convertWorkspaceAgentMetadata(db []database.WorkspaceAgentMetadatum) []code
},
})
}
// Sorting prevents the metadata from jumping around in the frontend.
sort.Slice(result, func(i, j int) bool {
return result[i].Description.Key < result[j].Description.Key
})
return result
}

View File

@ -1153,7 +1153,7 @@ func TestWorkspaceAgent_Metadata(t *testing.T) {
require.Len(t, update, 3)
check(wantMetadata1, update[0], true)
const maxValueLen = 32 << 10
const maxValueLen = 2048
tooLongValueMetadata := wantMetadata1
tooLongValueMetadata.Value = strings.Repeat("a", maxValueLen*2)
tooLongValueMetadata.Error = ""