fix(coderd): prevent lost messages in `watchWorkspaceAgentMetadata` (#7934)

* fix(codersdk): wait for subscription in WatchWorkspaceAgentMetadata
* fix(coderd): subscribe before sending initial metadata event
* test(coderd): add retries to TestWorkspaceAgent_Metadata to avoid flake
This commit is contained in:
Mathias Fredriksson 2023-06-13 15:21:06 +03:00 committed by GitHub
parent 518300a26c
commit 1d0fae83a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 51 additions and 17 deletions

View File

@ -1434,9 +1434,6 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
})
}
// Send initial metadata.
sendMetadata(true)
// We debounce metadata updates to avoid overloading the frontend when
// an agent is sending a lot of updates.
pubsubDebounce := debounce.New(time.Second)
@ -1444,7 +1441,8 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
pubsubDebounce = debounce.New(time.Millisecond * 100)
}
// Send metadata on updates.
// Send metadata on updates, we must ensure subscription before sending
// initial metadata to guarantee that events in-between are not missed.
cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, _ []byte) {
pubsubDebounce(func() {
sendMetadata(true)
@ -1456,12 +1454,14 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
}
defer cancelSub()
// Send initial metadata.
sendMetadata(true)
for {
select {
case <-senderClosed:
return
case <-refreshTicker.C:
break
}
// Avoid spamming the DB with reads we know there are no updates. We want

View File

@ -1268,11 +1268,6 @@ func TestWorkspaceAgent_Metadata(t *testing.T) {
var update []codersdk.WorkspaceAgentMetadata
check := func(want codersdk.WorkspaceAgentMetadataResult, got codersdk.WorkspaceAgentMetadata) {
require.Equal(t, want.Value, got.Result.Value)
require.Equal(t, want.Error, got.Result.Error)
}
wantMetadata1 := codersdk.WorkspaceAgentMetadataResult{
CollectedAt: time.Now(),
Value: "bar",
@ -1285,17 +1280,38 @@ func TestWorkspaceAgent_Metadata(t *testing.T) {
recvUpdate := func() []codersdk.WorkspaceAgentMetadata {
select {
case <-ctx.Done():
t.Fatalf("context done: %v", ctx.Err())
case err := <-errors:
t.Fatalf("error watching metadata: %v", err)
return nil
case update := <-updates:
return update
}
return nil
}
check := func(want codersdk.WorkspaceAgentMetadataResult, got codersdk.WorkspaceAgentMetadata, retry bool) {
// We can't trust the order of the updates due to timers and debounces,
// so let's check a few times more.
for i := 0; retry && i < 2 && (want.Value != got.Result.Value || want.Error != got.Result.Error); i++ {
update = recvUpdate()
for _, m := range update {
if m.Description.Key == got.Description.Key {
got = m
break
}
}
}
ok1 := assert.Equal(t, want.Value, got.Result.Value)
ok2 := assert.Equal(t, want.Error, got.Result.Error)
if !ok1 || !ok2 {
require.FailNow(t, "check failed")
}
}
update = recvUpdate()
require.Len(t, update, 3)
check(wantMetadata1, update[0])
check(wantMetadata1, update[0], false)
// The second metadata result is not yet posted.
require.Zero(t, update[1].Result.CollectedAt)
@ -1303,14 +1319,14 @@ func TestWorkspaceAgent_Metadata(t *testing.T) {
post("foo2", wantMetadata2)
update = recvUpdate()
require.Len(t, update, 3)
check(wantMetadata1, update[0])
check(wantMetadata2, update[1])
check(wantMetadata1, update[0], true)
check(wantMetadata2, update[1], true)
wantMetadata1.Error = "error"
post("foo1", wantMetadata1)
update = recvUpdate()
require.Len(t, update, 3)
check(wantMetadata1, update[0])
check(wantMetadata1, update[0], true)
const maxValueLen = 32 << 10
tooLongValueMetadata := wantMetadata1
@ -1319,6 +1335,9 @@ func TestWorkspaceAgent_Metadata(t *testing.T) {
tooLongValueMetadata.CollectedAt = time.Now()
post("foo3", tooLongValueMetadata)
got := recvUpdate()[2]
for i := 0; i < 2 && len(got.Result.Value) != maxValueLen; i++ {
got = recvUpdate()[2]
}
require.Len(t, got.Result.Value, maxValueLen)
require.NotEmpty(t, got.Result.Error)

View File

@ -304,6 +304,7 @@ func (c *Client) WatchWorkspaceAgentMetadata(ctx context.Context, id uuid.UUID)
metadataChan := make(chan []WorkspaceAgentMetadata, 256)
ready := make(chan struct{})
watch := func() error {
res, err := c.Request(ctx, http.MethodGet, fmt.Sprintf("/api/v2/workspaceagents/%s/watch-metadata", id), nil)
if err != nil {
@ -316,12 +317,12 @@ func (c *Client) WatchWorkspaceAgentMetadata(ctx context.Context, id uuid.UUID)
nextEvent := ServerSentEventReader(ctx, res.Body)
defer res.Body.Close()
firstEvent := true
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
break
}
sse, err := nextEvent()
@ -329,6 +330,11 @@ func (c *Client) WatchWorkspaceAgentMetadata(ctx context.Context, id uuid.UUID)
return err
}
if firstEvent {
close(ready) // Only close ready after the first event is received.
firstEvent = false
}
b, ok := sse.Data.([]byte)
if !ok {
return xerrors.Errorf("unexpected data type: %T", sse.Data)
@ -358,9 +364,18 @@ func (c *Client) WatchWorkspaceAgentMetadata(ctx context.Context, id uuid.UUID)
errorChan := make(chan error, 1)
go func() {
defer close(errorChan)
errorChan <- watch()
err := watch()
select {
case <-ready:
default:
close(ready) // Error before first event.
}
errorChan <- err
}()
// Wait until first event is received and the subscription is registered.
<-ready
return metadataChan, errorChan
}