chore(healthcheck): fix DERP test flakes (#7211)

This commit is contained in:
Colin Adler 2023-04-19 20:03:05 -05:00 committed by GitHub
parent f60b5579a7
commit 5f5edb18b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 161 additions and 41 deletions

16
coderd/apidoc/docs.go generated
View File

@ -9749,6 +9749,19 @@ const docTemplate = `{
"ParameterSourceSchemeData"
]
},
"derp.ServerInfoMessage": {
"type": "object",
"properties": {
"tokenBucketBytesBurst": {
"description": "TokenBucketBytesBurst is how many bytes the server will\nallow to burst, temporarily violating\nTokenBucketBytesPerSecond.\n\nZero means unspecified. There might be a limit, but the\nclient need not try to respect it.",
"type": "integer"
},
"tokenBucketBytesPerSecond": {
"description": "TokenBucketBytesPerSecond is how many bytes per second the\nserver says it will accept, including all framing bytes.\n\nZero means unspecified. There might be a limit, but the\nclient need not try to respect it.",
"type": "integer"
}
}
},
"healthcheck.AccessURLReport": {
"type": "object",
"properties": {
@ -9795,6 +9808,9 @@ const docTemplate = `{
"node": {
"$ref": "#/definitions/tailcfg.DERPNode"
},
"node_info": {
"$ref": "#/definitions/derp.ServerInfoMessage"
},
"round_trip_ping": {
"type": "integer"
},

View File

@ -8812,6 +8812,19 @@
"ParameterSourceSchemeData"
]
},
"derp.ServerInfoMessage": {
"type": "object",
"properties": {
"tokenBucketBytesBurst": {
"description": "TokenBucketBytesBurst is how many bytes the server will\nallow to burst, temporarily violating\nTokenBucketBytesPerSecond.\n\nZero means unspecified. There might be a limit, but the\nclient need not try to respect it.",
"type": "integer"
},
"tokenBucketBytesPerSecond": {
"description": "TokenBucketBytesPerSecond is how many bytes per second the\nserver says it will accept, including all framing bytes.\n\nZero means unspecified. There might be a limit, but the\nclient need not try to respect it.",
"type": "integer"
}
}
},
"healthcheck.AccessURLReport": {
"type": "object",
"properties": {
@ -8858,6 +8871,9 @@
"node": {
"$ref": "#/definitions/tailcfg.DERPNode"
},
"node_info": {
"$ref": "#/definitions/derp.ServerInfoMessage"
},
"round_trip_ping": {
"type": "integer"
},

View File

@ -11,7 +11,6 @@ import (
"sync/atomic"
"time"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"tailscale.com/derp"
"tailscale.com/derp/derphttp"
@ -21,6 +20,8 @@ import (
"tailscale.com/tailcfg"
"tailscale.com/types/key"
tslogger "tailscale.com/types/logger"
"github.com/coder/coder/coderd/util/ptr"
)
type DERPReport struct {
@ -48,11 +49,12 @@ type DERPNodeReport struct {
Healthy bool `json:"healthy"`
Node *tailcfg.DERPNode `json:"node"`
CanExchangeMessages bool `json:"can_exchange_messages"`
RoundTripPing time.Duration `json:"round_trip_ping"`
UsesWebsocket bool `json:"uses_websocket"`
ClientLogs [][]string `json:"client_logs"`
ClientErrs [][]error `json:"client_errs"`
ServerInfo derp.ServerInfoMessage `json:"node_info"`
CanExchangeMessages bool `json:"can_exchange_messages"`
RoundTripPing time.Duration `json:"round_trip_ping"`
UsesWebsocket bool `json:"uses_websocket"`
ClientLogs [][]string `json:"client_logs"`
ClientErrs [][]error `json:"client_errs"`
STUN DERPStunReport `json:"stun"`
}
@ -161,8 +163,19 @@ func (r *DERPNodeReport) Run(ctx context.Context) {
r.ClientLogs = [][]string{}
r.ClientErrs = [][]error{}
r.doExchangeMessage(ctx)
r.doSTUNTest(ctx)
wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
r.doExchangeMessage(ctx)
}()
go func() {
defer wg.Done()
r.doSTUNTest(ctx)
}()
wg.Wait()
// We can't exchange messages with the node,
if (!r.CanExchangeMessages && !r.Node.STUNOnly) ||
@ -181,8 +194,13 @@ func (r *DERPNodeReport) doExchangeMessage(ctx context.Context) {
return
}
var peerKey atomic.Pointer[key.NodePublic]
eg, ctx := errgroup.WithContext(ctx)
var (
peerKey atomic.Pointer[key.NodePublic]
lastSent atomic.Pointer[time.Time]
)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wg := &sync.WaitGroup{}
receive, receiveID, err := r.derpClient(ctx, r.derpURL())
if err != nil {
@ -190,51 +208,64 @@ func (r *DERPNodeReport) doExchangeMessage(ctx context.Context) {
}
defer receive.Close()
eg.Go(func() error {
wg.Add(2)
go func() {
defer wg.Done()
defer receive.Close()
pkt, err := r.recvData(receive)
if err != nil {
r.writeClientErr(receiveID, xerrors.Errorf("recv derp message: %w", err))
return err
return
}
if *peerKey.Load() != pkt.Source {
r.writeClientErr(receiveID, xerrors.Errorf("received pkt from unknown peer: %s", pkt.Source.ShortString()))
return err
return
}
t, err := time.Parse(time.RFC3339Nano, string(pkt.Data))
if err != nil {
r.writeClientErr(receiveID, xerrors.Errorf("parse time from peer: %w", err))
return err
}
t := lastSent.Load()
r.mu.Lock()
r.CanExchangeMessages = true
r.RoundTripPing = time.Since(t)
r.RoundTripPing = time.Since(*t)
r.mu.Unlock()
return nil
})
eg.Go(func() error {
cancel()
}()
go func() {
defer wg.Done()
send, sendID, err := r.derpClient(ctx, r.derpURL())
if err != nil {
return err
return
}
defer send.Close()
key := send.SelfPublicKey()
peerKey.Store(&key)
err = send.Send(receive.SelfPublicKey(), []byte(time.Now().Format(time.RFC3339Nano)))
if err != nil {
r.writeClientErr(sendID, xerrors.Errorf("send derp message: %w", err))
return err
}
return nil
})
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
_ = eg.Wait()
var iter uint8
for {
lastSent.Store(ptr.Ref(time.Now()))
err = send.Send(receive.SelfPublicKey(), []byte{iter})
if err != nil {
r.writeClientErr(sendID, xerrors.Errorf("send derp message: %w", err))
return
}
iter++
select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
}()
wg.Wait()
}
func (r *DERPNodeReport) doSTUNTest(ctx context.Context) {
@ -378,7 +409,7 @@ func (r *DERPNodeReport) derpClient(ctx context.Context, derpURL *url.URL) (*der
return client, id, nil
}
func (*DERPNodeReport) recvData(client *derphttp.Client) (derp.ReceivedPacket, error) {
func (r *DERPNodeReport) recvData(client *derphttp.Client) (derp.ReceivedPacket, error) {
for {
msg, err := client.Recv()
if err != nil {
@ -388,6 +419,10 @@ func (*DERPNodeReport) recvData(client *derphttp.Client) (derp.ReceivedPacket, e
switch msg := msg.(type) {
case derp.ReceivedPacket:
return msg, nil
case derp.ServerInfoMessage:
r.mu.Lock()
r.ServerInfo = msg
r.mu.Unlock()
default:
// Drop all others!
}

View File

@ -19,6 +19,7 @@ import (
"github.com/coder/coder/coderd/healthcheck"
"github.com/coder/coder/tailnet"
"github.com/coder/coder/testutil"
)
//nolint:tparallel
@ -66,8 +67,7 @@ func TestDERP(t *testing.T) {
for _, node := range region.NodeReports {
assert.True(t, node.Healthy)
assert.True(t, node.CanExchangeMessages)
// TODO: test this without serializing time.Time over the wire.
// assert.Positive(t, node.RoundTripPing)
assert.Positive(t, node.RoundTripPing)
assert.Len(t, node.ClientLogs, 2)
assert.Len(t, node.ClientLogs[0], 1)
assert.Len(t, node.ClientErrs[0], 0)
@ -81,9 +81,13 @@ func TestDERP(t *testing.T) {
}
})
t.Run("OK/Tailscale/Dallas", func(t *testing.T) {
t.Run("Tailscale/Dallas/OK", func(t *testing.T) {
t.Parallel()
if testutil.InCI() {
t.Skip("This test depends on reaching out over the network to Tailscale servers, which is inherently flaky.")
}
derpSrv := derp.NewServer(key.NewNode(), func(format string, args ...any) { t.Logf(format, args...) })
defer derpSrv.Close()
srv := httptest.NewServer(derphttp.Handler(derpSrv))
@ -107,8 +111,7 @@ func TestDERP(t *testing.T) {
for _, node := range region.NodeReports {
assert.True(t, node.Healthy)
assert.True(t, node.CanExchangeMessages)
// TODO: test this without serializing time.Time over the wire.
// assert.Positive(t, node.RoundTripPing)
assert.Positive(t, node.RoundTripPing)
assert.Len(t, node.ClientLogs, 2)
assert.Len(t, node.ClientLogs[0], 1)
assert.Len(t, node.ClientErrs[0], 0)
@ -171,13 +174,12 @@ func TestDERP(t *testing.T) {
for _, node := range region.NodeReports {
assert.False(t, node.Healthy)
assert.True(t, node.CanExchangeMessages)
// TODO: test this without serializing time.Time over the wire.
// assert.Positive(t, node.RoundTripPing)
assert.Positive(t, node.RoundTripPing)
assert.Len(t, node.ClientLogs, 2)
assert.Len(t, node.ClientLogs[0], 3)
assert.Len(t, node.ClientLogs[1], 3)
assert.Len(t, node.ClientErrs, 2)
assert.Len(t, node.ClientErrs[0], 1)
assert.Len(t, node.ClientErrs[0], 1) // this
assert.Len(t, node.ClientErrs[1], 1)
assert.True(t, node.UsesWebsocket)
@ -188,7 +190,7 @@ func TestDERP(t *testing.T) {
}
})
t.Run("OK/STUNOnly", func(t *testing.T) {
t.Run("STUNOnly/OK", func(t *testing.T) {
t.Parallel()
var (

View File

@ -103,6 +103,10 @@ curl -X GET http://coder-server:8080/api/v2/debug/health \
"stunport": 0,
"stuntestIP": "string"
},
"node_info": {
"tokenBucketBytesBurst": 0,
"tokenBucketBytesPerSecond": 0
},
"round_trip_ping": 0,
"stun": {
"canSTUN": true,
@ -158,6 +162,10 @@ curl -X GET http://coder-server:8080/api/v2/debug/health \
"stunport": 0,
"stuntestIP": "string"
},
"node_info": {
"tokenBucketBytesBurst": 0,
"tokenBucketBytesPerSecond": 0
},
"round_trip_ping": 0,
"stun": {
"canSTUN": true,

View File

@ -5563,6 +5563,24 @@ Parameter represents a set value for the scope.
| `none` |
| `data` |
## derp.ServerInfoMessage
```json
{
"tokenBucketBytesBurst": 0,
"tokenBucketBytesPerSecond": 0
}
```
### Properties
| Name | Type | Required | Restrictions | Description |
| ------------------------------------------------------------------------------------------ | ------- | -------- | ------------ | ------------------------------------------------------------------------------------------------------------------------ |
| `tokenBucketBytesBurst` | integer | false | | Tokenbucketbytesburst is how many bytes the server will allow to burst, temporarily violating TokenBucketBytesPerSecond. |
| Zero means unspecified. There might be a limit, but the client need not try to respect it. |
| `tokenBucketBytesPerSecond` | integer | false | | Tokenbucketbytespersecond is how many bytes per second the server says it will accept, including all framing bytes. |
| Zero means unspecified. There might be a limit, but the client need not try to respect it. |
## healthcheck.AccessURLReport
```json
@ -5607,6 +5625,10 @@ Parameter represents a set value for the scope.
"stunport": 0,
"stuntestIP": "string"
},
"node_info": {
"tokenBucketBytesBurst": 0,
"tokenBucketBytesPerSecond": 0
},
"round_trip_ping": 0,
"stun": {
"canSTUN": true,
@ -5626,6 +5648,7 @@ Parameter represents a set value for the scope.
| `client_logs` | array of array | false | | |
| `healthy` | boolean | false | | |
| `node` | [tailcfg.DERPNode](#tailcfgderpnode) | false | | |
| `node_info` | [derp.ServerInfoMessage](#derpserverinfomessage) | false | | |
| `round_trip_ping` | integer | false | | |
| `stun` | [healthcheck.DERPStunReport](#healthcheckderpstunreport) | false | | |
| `uses_websocket` | boolean | false | | |
@ -5655,6 +5678,10 @@ Parameter represents a set value for the scope.
"stunport": 0,
"stuntestIP": "string"
},
"node_info": {
"tokenBucketBytesBurst": 0,
"tokenBucketBytesPerSecond": 0
},
"round_trip_ping": 0,
"stun": {
"canSTUN": true,
@ -5758,6 +5785,10 @@ Parameter represents a set value for the scope.
"stunport": 0,
"stuntestIP": "string"
},
"node_info": {
"tokenBucketBytesBurst": 0,
"tokenBucketBytesPerSecond": 0
},
"round_trip_ping": 0,
"stun": {
"canSTUN": true,
@ -5813,6 +5844,10 @@ Parameter represents a set value for the scope.
"stunport": 0,
"stuntestIP": "string"
},
"node_info": {
"tokenBucketBytesBurst": 0,
"tokenBucketBytesPerSecond": 0
},
"round_trip_ping": 0,
"stun": {
"canSTUN": true,
@ -5947,6 +5982,10 @@ Parameter represents a set value for the scope.
"stunport": 0,
"stuntestIP": "string"
},
"node_info": {
"tokenBucketBytesBurst": 0,
"tokenBucketBytesPerSecond": 0
},
"round_trip_ping": 0,
"stun": {
"canSTUN": true,
@ -6002,6 +6041,10 @@ Parameter represents a set value for the scope.
"stunport": 0,
"stuntestIP": "string"
},
"node_info": {
"tokenBucketBytesBurst": 0,
"tokenBucketBytesPerSecond": 0
},
"round_trip_ping": 0,
"stun": {
"canSTUN": true,