feat: Add provisionerdaemon to coderd (#141)
* feat: Add history middleware parameters
These will be used for streaming logs, checking status,
and other operations related to workspace and project
history.
* refactor: Move all HTTP routes to top-level struct
Nesting all structs behind their respective structures
is leaky, and promotes naming conflicts between handlers.
Our HTTP routes cannot have conflicts, so neither should
function naming.
* Add provisioner daemon routes
* Add periodic updates
* Skip pubsub if short
* Return jobs with WorkspaceHistory
* Add endpoints for extracting singular history
* The full end-to-end operation works
* fix: Disable compression for websocket dRPC transport (#145)
There is a race condition in the interop between the websocket and `dRPC`: https://github.com/coder/coder/runs/5038545709?check_suite_focus=true#step:7:117 - it seems both the websocket and dRPC feel like they own the `byte[]` being sent between them. This can lead to data races, in which both `dRPC` and the websocket are writing.
This is just tracking some experimentation to fix that race condition
## Run results: ##
- Run 1: peer test failure
- Run 2: peer test failure
- Run 3: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040858460?check_suite_focus=true#step:8:45
```
status code 412: The provided project history is running. Wait for it to complete importing!`
```
- Run 4: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040957999?check_suite_focus=true#step:7:176
```
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
- Run 5: peer failure
- Run 6: Pass ✅
- Run 7: Peer failure
## Open Questions: ##
### Is `dRPC` or `websocket` at fault for the data race?
It looks like this condition is specifically happening when `dRPC` decides to [`SendError`]). This constructs a new byte payload from [`MarshalError`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/error.go#L15) - so `dRPC` has created this buffer and owns it.
From `dRPC`'s perspective, the callstack looks like this:
- [`sendPacket`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcstream/stream.go#L253)
- [`writeFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L65)
- [`AppendFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/packet.go#L128)
- with finally the data race happening here:
```go
// AppendFrame appends a marshaled form of the frame to the provided buffer.
func AppendFrame(buf []byte, fr Frame) []byte {
...
out := buf
out = append(out, control). // <---------
```
This should be fine, since `dPRC` create this buffer, and is taking the byte buffer constructed from `MarshalError` and tacking a bunch of headers on it to create a proper frame.
Once `dRPC` is done writing, it _hangs onto the buffer and resets it here__: https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L73
However... the websocket implementation, once it gets the buffer, it runs a `statelessDeflate` [here](https://github.com/nhooyr/websocket/blob/8dee580a7f74cf1713400307b4eee514b927870f/write.go#L180), which compresses the buffer on the fly. This functionality actually [mutates the buffer in place](https://github.com/klauspost/compress/blob/a1a9cfc821f00faf2f5231beaa96244344d50391/flate/stateless.go#L94), which is where get our race.
In the case where the `byte[]` aren't being manipulated anywhere else, this compress-in-place operation would be safe, and that's probably the case for most over-the-wire usages. In this case, though, where we're plumbing `dRPC` -> websocket, they both are manipulating it (`dRPC` is reusing the buffer for the next `write`, and `websocket` is compressing on the fly).
### Why does cloning on `Read` fail?
Get a bunch of errors like:
```
2022/02/02 19:26:10 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
```
# UPDATE:
We decided we could disable websocket compression, which would avoid the race because the in-place `deflate` operaton would no longer be run. Trying that out now:
- Run 1: ✅
- Run 2: https://github.com/coder/coder/runs/5042645522?check_suite_focus=true#step:8:338
- Run 3: ✅
- Run 4: https://github.com/coder/coder/runs/5042988758?check_suite_focus=true#step:7:168
- Run 5: ✅
* fix: Remove race condition with acquiredJobDone channel (#148)
Found another data race while running the tests: https://github.com/coder/coder/runs/5044320845?check_suite_focus=true#step:7:83
__Issue:__ There is a race in the p.acquiredJobDone chan - in particular, there can be a case where we're waiting on the channel to finish (in close) with <-p.acquiredJobDone, but in parallel, an acquireJob could've been started, which would create a new channel for p.acquiredJobDone. There is a similar race in `close(..)`ing the channel, which also came up in test runs.
__Fix:__ Instead of recreating the channel everytime, we can use `sync.WaitGroup` to accomplish the same functionality - a semaphore to make close wait for the current job to wrap up.
* fix: Bump up workspace history timeout (#149)
This is an attempted fix for failures like: https://github.com/coder/coder/runs/5043435263?check_suite_focus=true#step:7:32
Looking at the timing of the test:
```
t.go:56: 2022-02-02 21:33:21.964 [DEBUG] (terraform-provisioner) <provision.go:139> ran apply
t.go:56: 2022-02-02 21:33:21.991 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.050 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.090 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.140 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.195 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.240 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
It appears that the `terraform apply` job had just finished - with less than a second to spare until our `require.Eventually` completes - but there's still work to be done (ie, collecting the state files). So my suspicion is that terraform might, in some cases, exceed our 5s timeout.
Note that in the setup for this test - there is a similar project history wait that waits for 15s, so I borrowed that here.
In the future - we can look at potentially using a simple echo provider to exercise this in the unit test, in a way that is more reliable in terms of timing. I'll log an issue to track that.
Co-authored-by: Bryan <bryan@coder.com>
2022-02-03 20:34:50 +00:00
|
|
|
package coderd
|
|
|
|
|
|
|
|
import (
|
2022-02-12 19:34:04 +00:00
|
|
|
"context"
|
2022-02-08 18:00:44 +00:00
|
|
|
"database/sql"
|
|
|
|
"encoding/json"
|
|
|
|
"errors"
|
2023-05-31 06:15:58 +00:00
|
|
|
"io"
|
2022-02-08 18:00:44 +00:00
|
|
|
"net/http"
|
2022-09-08 03:16:26 +00:00
|
|
|
"sort"
|
2022-02-12 19:34:04 +00:00
|
|
|
"strconv"
|
feat: Add provisionerdaemon to coderd (#141)
* feat: Add history middleware parameters
These will be used for streaming logs, checking status,
and other operations related to workspace and project
history.
* refactor: Move all HTTP routes to top-level struct
Nesting all structs behind their respective structures
is leaky, and promotes naming conflicts between handlers.
Our HTTP routes cannot have conflicts, so neither should
function naming.
* Add provisioner daemon routes
* Add periodic updates
* Skip pubsub if short
* Return jobs with WorkspaceHistory
* Add endpoints for extracting singular history
* The full end-to-end operation works
* fix: Disable compression for websocket dRPC transport (#145)
There is a race condition in the interop between the websocket and `dRPC`: https://github.com/coder/coder/runs/5038545709?check_suite_focus=true#step:7:117 - it seems both the websocket and dRPC feel like they own the `byte[]` being sent between them. This can lead to data races, in which both `dRPC` and the websocket are writing.
This is just tracking some experimentation to fix that race condition
## Run results: ##
- Run 1: peer test failure
- Run 2: peer test failure
- Run 3: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040858460?check_suite_focus=true#step:8:45
```
status code 412: The provided project history is running. Wait for it to complete importing!`
```
- Run 4: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040957999?check_suite_focus=true#step:7:176
```
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
- Run 5: peer failure
- Run 6: Pass ✅
- Run 7: Peer failure
## Open Questions: ##
### Is `dRPC` or `websocket` at fault for the data race?
It looks like this condition is specifically happening when `dRPC` decides to [`SendError`]). This constructs a new byte payload from [`MarshalError`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/error.go#L15) - so `dRPC` has created this buffer and owns it.
From `dRPC`'s perspective, the callstack looks like this:
- [`sendPacket`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcstream/stream.go#L253)
- [`writeFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L65)
- [`AppendFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/packet.go#L128)
- with finally the data race happening here:
```go
// AppendFrame appends a marshaled form of the frame to the provided buffer.
func AppendFrame(buf []byte, fr Frame) []byte {
...
out := buf
out = append(out, control). // <---------
```
This should be fine, since `dPRC` create this buffer, and is taking the byte buffer constructed from `MarshalError` and tacking a bunch of headers on it to create a proper frame.
Once `dRPC` is done writing, it _hangs onto the buffer and resets it here__: https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L73
However... the websocket implementation, once it gets the buffer, it runs a `statelessDeflate` [here](https://github.com/nhooyr/websocket/blob/8dee580a7f74cf1713400307b4eee514b927870f/write.go#L180), which compresses the buffer on the fly. This functionality actually [mutates the buffer in place](https://github.com/klauspost/compress/blob/a1a9cfc821f00faf2f5231beaa96244344d50391/flate/stateless.go#L94), which is where get our race.
In the case where the `byte[]` aren't being manipulated anywhere else, this compress-in-place operation would be safe, and that's probably the case for most over-the-wire usages. In this case, though, where we're plumbing `dRPC` -> websocket, they both are manipulating it (`dRPC` is reusing the buffer for the next `write`, and `websocket` is compressing on the fly).
### Why does cloning on `Read` fail?
Get a bunch of errors like:
```
2022/02/02 19:26:10 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
```
# UPDATE:
We decided we could disable websocket compression, which would avoid the race because the in-place `deflate` operaton would no longer be run. Trying that out now:
- Run 1: ✅
- Run 2: https://github.com/coder/coder/runs/5042645522?check_suite_focus=true#step:8:338
- Run 3: ✅
- Run 4: https://github.com/coder/coder/runs/5042988758?check_suite_focus=true#step:7:168
- Run 5: ✅
* fix: Remove race condition with acquiredJobDone channel (#148)
Found another data race while running the tests: https://github.com/coder/coder/runs/5044320845?check_suite_focus=true#step:7:83
__Issue:__ There is a race in the p.acquiredJobDone chan - in particular, there can be a case where we're waiting on the channel to finish (in close) with <-p.acquiredJobDone, but in parallel, an acquireJob could've been started, which would create a new channel for p.acquiredJobDone. There is a similar race in `close(..)`ing the channel, which also came up in test runs.
__Fix:__ Instead of recreating the channel everytime, we can use `sync.WaitGroup` to accomplish the same functionality - a semaphore to make close wait for the current job to wrap up.
* fix: Bump up workspace history timeout (#149)
This is an attempted fix for failures like: https://github.com/coder/coder/runs/5043435263?check_suite_focus=true#step:7:32
Looking at the timing of the test:
```
t.go:56: 2022-02-02 21:33:21.964 [DEBUG] (terraform-provisioner) <provision.go:139> ran apply
t.go:56: 2022-02-02 21:33:21.991 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.050 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.090 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.140 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.195 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.240 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
It appears that the `terraform apply` job had just finished - with less than a second to spare until our `require.Eventually` completes - but there's still work to be done (ie, collecting the state files). So my suspicion is that terraform might, in some cases, exceed our 5s timeout.
Note that in the setup for this test - there is a similar project history wait that waits for 15s, so I borrowed that here.
In the future - we can look at potentially using a simple echo provider to exercise this in the unit test, in a way that is more reliable in terms of timing. I'll log an issue to track that.
Co-authored-by: Bryan <bryan@coder.com>
2022-02-03 20:34:50 +00:00
|
|
|
|
|
|
|
"github.com/google/uuid"
|
2023-05-31 06:15:58 +00:00
|
|
|
"golang.org/x/xerrors"
|
2022-06-22 13:23:14 +00:00
|
|
|
"nhooyr.io/websocket"
|
feat: Add provisionerdaemon to coderd (#141)
* feat: Add history middleware parameters
These will be used for streaming logs, checking status,
and other operations related to workspace and project
history.
* refactor: Move all HTTP routes to top-level struct
Nesting all structs behind their respective structures
is leaky, and promotes naming conflicts between handlers.
Our HTTP routes cannot have conflicts, so neither should
function naming.
* Add provisioner daemon routes
* Add periodic updates
* Skip pubsub if short
* Return jobs with WorkspaceHistory
* Add endpoints for extracting singular history
* The full end-to-end operation works
* fix: Disable compression for websocket dRPC transport (#145)
There is a race condition in the interop between the websocket and `dRPC`: https://github.com/coder/coder/runs/5038545709?check_suite_focus=true#step:7:117 - it seems both the websocket and dRPC feel like they own the `byte[]` being sent between them. This can lead to data races, in which both `dRPC` and the websocket are writing.
This is just tracking some experimentation to fix that race condition
## Run results: ##
- Run 1: peer test failure
- Run 2: peer test failure
- Run 3: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040858460?check_suite_focus=true#step:8:45
```
status code 412: The provided project history is running. Wait for it to complete importing!`
```
- Run 4: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040957999?check_suite_focus=true#step:7:176
```
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
- Run 5: peer failure
- Run 6: Pass ✅
- Run 7: Peer failure
## Open Questions: ##
### Is `dRPC` or `websocket` at fault for the data race?
It looks like this condition is specifically happening when `dRPC` decides to [`SendError`]). This constructs a new byte payload from [`MarshalError`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/error.go#L15) - so `dRPC` has created this buffer and owns it.
From `dRPC`'s perspective, the callstack looks like this:
- [`sendPacket`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcstream/stream.go#L253)
- [`writeFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L65)
- [`AppendFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/packet.go#L128)
- with finally the data race happening here:
```go
// AppendFrame appends a marshaled form of the frame to the provided buffer.
func AppendFrame(buf []byte, fr Frame) []byte {
...
out := buf
out = append(out, control). // <---------
```
This should be fine, since `dPRC` create this buffer, and is taking the byte buffer constructed from `MarshalError` and tacking a bunch of headers on it to create a proper frame.
Once `dRPC` is done writing, it _hangs onto the buffer and resets it here__: https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L73
However... the websocket implementation, once it gets the buffer, it runs a `statelessDeflate` [here](https://github.com/nhooyr/websocket/blob/8dee580a7f74cf1713400307b4eee514b927870f/write.go#L180), which compresses the buffer on the fly. This functionality actually [mutates the buffer in place](https://github.com/klauspost/compress/blob/a1a9cfc821f00faf2f5231beaa96244344d50391/flate/stateless.go#L94), which is where get our race.
In the case where the `byte[]` aren't being manipulated anywhere else, this compress-in-place operation would be safe, and that's probably the case for most over-the-wire usages. In this case, though, where we're plumbing `dRPC` -> websocket, they both are manipulating it (`dRPC` is reusing the buffer for the next `write`, and `websocket` is compressing on the fly).
### Why does cloning on `Read` fail?
Get a bunch of errors like:
```
2022/02/02 19:26:10 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
```
# UPDATE:
We decided we could disable websocket compression, which would avoid the race because the in-place `deflate` operaton would no longer be run. Trying that out now:
- Run 1: ✅
- Run 2: https://github.com/coder/coder/runs/5042645522?check_suite_focus=true#step:8:338
- Run 3: ✅
- Run 4: https://github.com/coder/coder/runs/5042988758?check_suite_focus=true#step:7:168
- Run 5: ✅
* fix: Remove race condition with acquiredJobDone channel (#148)
Found another data race while running the tests: https://github.com/coder/coder/runs/5044320845?check_suite_focus=true#step:7:83
__Issue:__ There is a race in the p.acquiredJobDone chan - in particular, there can be a case where we're waiting on the channel to finish (in close) with <-p.acquiredJobDone, but in parallel, an acquireJob could've been started, which would create a new channel for p.acquiredJobDone. There is a similar race in `close(..)`ing the channel, which also came up in test runs.
__Fix:__ Instead of recreating the channel everytime, we can use `sync.WaitGroup` to accomplish the same functionality - a semaphore to make close wait for the current job to wrap up.
* fix: Bump up workspace history timeout (#149)
This is an attempted fix for failures like: https://github.com/coder/coder/runs/5043435263?check_suite_focus=true#step:7:32
Looking at the timing of the test:
```
t.go:56: 2022-02-02 21:33:21.964 [DEBUG] (terraform-provisioner) <provision.go:139> ran apply
t.go:56: 2022-02-02 21:33:21.991 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.050 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.090 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.140 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.195 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.240 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
It appears that the `terraform apply` job had just finished - with less than a second to spare until our `require.Eventually` completes - but there's still work to be done (ie, collecting the state files). So my suspicion is that terraform might, in some cases, exceed our 5s timeout.
Note that in the setup for this test - there is a similar project history wait that waits for 15s, so I borrowed that here.
In the future - we can look at potentially using a simple echo provider to exercise this in the unit test, in a way that is more reliable in terms of timing. I'll log an issue to track that.
Co-authored-by: Bryan <bryan@coder.com>
2022-02-03 20:34:50 +00:00
|
|
|
|
2022-02-12 19:34:04 +00:00
|
|
|
"cdr.dev/slog"
|
2023-05-31 06:15:58 +00:00
|
|
|
|
2023-08-18 18:55:43 +00:00
|
|
|
"github.com/coder/coder/v2/coderd/database"
|
|
|
|
"github.com/coder/coder/v2/coderd/database/dbauthz"
|
|
|
|
"github.com/coder/coder/v2/coderd/database/pubsub"
|
|
|
|
"github.com/coder/coder/v2/coderd/httpapi"
|
|
|
|
"github.com/coder/coder/v2/codersdk"
|
|
|
|
"github.com/coder/coder/v2/provisionersdk"
|
feat: Add provisionerdaemon to coderd (#141)
* feat: Add history middleware parameters
These will be used for streaming logs, checking status,
and other operations related to workspace and project
history.
* refactor: Move all HTTP routes to top-level struct
Nesting all structs behind their respective structures
is leaky, and promotes naming conflicts between handlers.
Our HTTP routes cannot have conflicts, so neither should
function naming.
* Add provisioner daemon routes
* Add periodic updates
* Skip pubsub if short
* Return jobs with WorkspaceHistory
* Add endpoints for extracting singular history
* The full end-to-end operation works
* fix: Disable compression for websocket dRPC transport (#145)
There is a race condition in the interop between the websocket and `dRPC`: https://github.com/coder/coder/runs/5038545709?check_suite_focus=true#step:7:117 - it seems both the websocket and dRPC feel like they own the `byte[]` being sent between them. This can lead to data races, in which both `dRPC` and the websocket are writing.
This is just tracking some experimentation to fix that race condition
## Run results: ##
- Run 1: peer test failure
- Run 2: peer test failure
- Run 3: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040858460?check_suite_focus=true#step:8:45
```
status code 412: The provided project history is running. Wait for it to complete importing!`
```
- Run 4: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040957999?check_suite_focus=true#step:7:176
```
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
- Run 5: peer failure
- Run 6: Pass ✅
- Run 7: Peer failure
## Open Questions: ##
### Is `dRPC` or `websocket` at fault for the data race?
It looks like this condition is specifically happening when `dRPC` decides to [`SendError`]). This constructs a new byte payload from [`MarshalError`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/error.go#L15) - so `dRPC` has created this buffer and owns it.
From `dRPC`'s perspective, the callstack looks like this:
- [`sendPacket`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcstream/stream.go#L253)
- [`writeFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L65)
- [`AppendFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/packet.go#L128)
- with finally the data race happening here:
```go
// AppendFrame appends a marshaled form of the frame to the provided buffer.
func AppendFrame(buf []byte, fr Frame) []byte {
...
out := buf
out = append(out, control). // <---------
```
This should be fine, since `dPRC` create this buffer, and is taking the byte buffer constructed from `MarshalError` and tacking a bunch of headers on it to create a proper frame.
Once `dRPC` is done writing, it _hangs onto the buffer and resets it here__: https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L73
However... the websocket implementation, once it gets the buffer, it runs a `statelessDeflate` [here](https://github.com/nhooyr/websocket/blob/8dee580a7f74cf1713400307b4eee514b927870f/write.go#L180), which compresses the buffer on the fly. This functionality actually [mutates the buffer in place](https://github.com/klauspost/compress/blob/a1a9cfc821f00faf2f5231beaa96244344d50391/flate/stateless.go#L94), which is where get our race.
In the case where the `byte[]` aren't being manipulated anywhere else, this compress-in-place operation would be safe, and that's probably the case for most over-the-wire usages. In this case, though, where we're plumbing `dRPC` -> websocket, they both are manipulating it (`dRPC` is reusing the buffer for the next `write`, and `websocket` is compressing on the fly).
### Why does cloning on `Read` fail?
Get a bunch of errors like:
```
2022/02/02 19:26:10 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
```
# UPDATE:
We decided we could disable websocket compression, which would avoid the race because the in-place `deflate` operaton would no longer be run. Trying that out now:
- Run 1: ✅
- Run 2: https://github.com/coder/coder/runs/5042645522?check_suite_focus=true#step:8:338
- Run 3: ✅
- Run 4: https://github.com/coder/coder/runs/5042988758?check_suite_focus=true#step:7:168
- Run 5: ✅
* fix: Remove race condition with acquiredJobDone channel (#148)
Found another data race while running the tests: https://github.com/coder/coder/runs/5044320845?check_suite_focus=true#step:7:83
__Issue:__ There is a race in the p.acquiredJobDone chan - in particular, there can be a case where we're waiting on the channel to finish (in close) with <-p.acquiredJobDone, but in parallel, an acquireJob could've been started, which would create a new channel for p.acquiredJobDone. There is a similar race in `close(..)`ing the channel, which also came up in test runs.
__Fix:__ Instead of recreating the channel everytime, we can use `sync.WaitGroup` to accomplish the same functionality - a semaphore to make close wait for the current job to wrap up.
* fix: Bump up workspace history timeout (#149)
This is an attempted fix for failures like: https://github.com/coder/coder/runs/5043435263?check_suite_focus=true#step:7:32
Looking at the timing of the test:
```
t.go:56: 2022-02-02 21:33:21.964 [DEBUG] (terraform-provisioner) <provision.go:139> ran apply
t.go:56: 2022-02-02 21:33:21.991 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.050 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.090 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.140 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.195 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.240 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
It appears that the `terraform apply` job had just finished - with less than a second to spare until our `require.Eventually` completes - but there's still work to be done (ie, collecting the state files). So my suspicion is that terraform might, in some cases, exceed our 5s timeout.
Note that in the setup for this test - there is a similar project history wait that waits for 15s, so I borrowed that here.
In the future - we can look at potentially using a simple echo provider to exercise this in the unit test, in a way that is more reliable in terms of timing. I'll log an issue to track that.
Co-authored-by: Bryan <bryan@coder.com>
2022-02-03 20:34:50 +00:00
|
|
|
)
|
|
|
|
|
2022-02-12 19:34:04 +00:00
|
|
|
// Returns provisioner logs based on query parameters.
|
|
|
|
// The intended usage for a client to stream all logs (with JS API):
|
2023-03-23 19:09:13 +00:00
|
|
|
// GET /logs
|
|
|
|
// GET /logs?after=<id>&follow
|
2022-02-12 19:34:04 +00:00
|
|
|
// The combination of these responses should provide all current logs
|
|
|
|
// to the consumer, and future logs are streamed in the follow request.
|
2022-05-26 03:14:08 +00:00
|
|
|
func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job database.ProvisionerJob) {
|
2022-09-21 22:07:00 +00:00
|
|
|
var (
|
2023-03-23 19:09:13 +00:00
|
|
|
ctx = r.Context()
|
|
|
|
logger = api.Logger.With(slog.F("job_id", job.ID))
|
|
|
|
follow = r.URL.Query().Has("follow")
|
|
|
|
afterRaw = r.URL.Query().Get("after")
|
2022-09-21 22:07:00 +00:00
|
|
|
)
|
2022-07-01 21:07:18 +00:00
|
|
|
|
2022-11-07 02:50:34 +00:00
|
|
|
var after int64
|
2022-02-12 19:34:04 +00:00
|
|
|
// Only fetch logs created after the time provided.
|
|
|
|
if afterRaw != "" {
|
2022-11-07 02:50:34 +00:00
|
|
|
var err error
|
|
|
|
after, err = strconv.ParseInt(afterRaw, 10, 64)
|
2022-02-12 19:34:04 +00:00
|
|
|
if err != nil {
|
2022-09-21 22:07:00 +00:00
|
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
2022-06-07 14:33:06 +00:00
|
|
|
Message: "Query param \"after\" must be an integer.",
|
2022-07-13 00:15:02 +00:00
|
|
|
Validations: []codersdk.ValidationError{
|
2022-06-03 21:48:09 +00:00
|
|
|
{Field: "after", Detail: "Must be an integer"},
|
|
|
|
},
|
2022-02-12 19:34:04 +00:00
|
|
|
})
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-03-27 20:50:53 +00:00
|
|
|
if !follow {
|
2023-06-20 10:30:45 +00:00
|
|
|
fetchAndWriteLogs(ctx, api.Database, job.ID, after, rw)
|
2023-03-27 20:50:53 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2023-05-31 06:15:58 +00:00
|
|
|
follower := newLogFollower(ctx, logger, api.Database, api.Pubsub, rw, r, job, after)
|
2022-11-16 22:34:06 +00:00
|
|
|
api.WebsocketWaitMutex.Lock()
|
|
|
|
api.WebsocketWaitGroup.Add(1)
|
|
|
|
api.WebsocketWaitMutex.Unlock()
|
|
|
|
defer api.WebsocketWaitGroup.Done()
|
2023-05-31 06:15:58 +00:00
|
|
|
follower.follow()
|
2022-02-12 19:34:04 +00:00
|
|
|
}
|
|
|
|
|
2022-05-26 03:14:08 +00:00
|
|
|
func (api *API) provisionerJobResources(rw http.ResponseWriter, r *http.Request, job database.ProvisionerJob) {
|
2022-09-21 22:07:00 +00:00
|
|
|
ctx := r.Context()
|
2022-03-07 17:40:54 +00:00
|
|
|
if !job.CompletedAt.Valid {
|
2023-01-13 14:30:48 +00:00
|
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
2022-03-07 17:40:54 +00:00
|
|
|
Message: "Job hasn't completed!",
|
|
|
|
})
|
|
|
|
return
|
|
|
|
}
|
2023-03-10 18:09:28 +00:00
|
|
|
|
|
|
|
// nolint:gocritic // GetWorkspaceResourcesByJobID is a system function.
|
|
|
|
resources, err := api.Database.GetWorkspaceResourcesByJobID(dbauthz.AsSystemRestricted(ctx), job.ID)
|
2022-03-07 17:40:54 +00:00
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
|
|
err = nil
|
|
|
|
}
|
|
|
|
if err != nil {
|
2022-09-21 22:07:00 +00:00
|
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
2022-06-07 14:33:06 +00:00
|
|
|
Message: "Internal error fetching job resources.",
|
2022-06-03 21:48:09 +00:00
|
|
|
Detail: err.Error(),
|
2022-03-07 17:40:54 +00:00
|
|
|
})
|
|
|
|
return
|
|
|
|
}
|
2022-04-11 21:06:15 +00:00
|
|
|
resourceIDs := make([]uuid.UUID, 0)
|
|
|
|
for _, resource := range resources {
|
|
|
|
resourceIDs = append(resourceIDs, resource.ID)
|
|
|
|
}
|
2023-03-10 18:09:28 +00:00
|
|
|
|
|
|
|
// nolint:gocritic // GetWorkspaceAgentsByResourceIDs is a system function.
|
|
|
|
resourceAgents, err := api.Database.GetWorkspaceAgentsByResourceIDs(dbauthz.AsSystemRestricted(ctx), resourceIDs)
|
2022-04-11 21:06:15 +00:00
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
|
|
err = nil
|
|
|
|
}
|
|
|
|
if err != nil {
|
2022-09-21 22:07:00 +00:00
|
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
2022-06-07 14:33:06 +00:00
|
|
|
Message: "Internal error fetching workspace agent.",
|
2022-06-03 21:48:09 +00:00
|
|
|
Detail: err.Error(),
|
2022-04-11 21:06:15 +00:00
|
|
|
})
|
|
|
|
return
|
|
|
|
}
|
2022-06-04 20:13:37 +00:00
|
|
|
resourceAgentIDs := make([]uuid.UUID, 0)
|
|
|
|
for _, agent := range resourceAgents {
|
|
|
|
resourceAgentIDs = append(resourceAgentIDs, agent.ID)
|
|
|
|
}
|
2023-03-10 18:09:28 +00:00
|
|
|
|
|
|
|
// nolint:gocritic // GetWorkspaceAppsByAgentIDs is a system function.
|
2023-04-13 18:06:16 +00:00
|
|
|
apps, err := api.Database.GetWorkspaceAppsByAgentIDs(dbauthz.AsSystemRestricted(ctx), resourceAgentIDs)
|
2022-06-04 20:13:37 +00:00
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
|
|
err = nil
|
|
|
|
}
|
|
|
|
if err != nil {
|
2022-09-21 22:07:00 +00:00
|
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
2022-06-07 14:33:06 +00:00
|
|
|
Message: "Internal error fetching workspace applications.",
|
|
|
|
Detail: err.Error(),
|
2022-06-04 20:13:37 +00:00
|
|
|
})
|
|
|
|
return
|
|
|
|
}
|
2023-03-10 18:09:28 +00:00
|
|
|
|
2023-09-25 21:47:17 +00:00
|
|
|
// nolint:gocritic // GetWorkspaceAgentScriptsByAgentIDs is a system function.
|
|
|
|
scripts, err := api.Database.GetWorkspaceAgentScriptsByAgentIDs(dbauthz.AsSystemRestricted(ctx), resourceAgentIDs)
|
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
|
|
err = nil
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
|
|
Message: "Internal error fetching workspace agent scripts.",
|
|
|
|
Detail: err.Error(),
|
|
|
|
})
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// nolint:gocritic // GetWorkspaceAgentLogSourcesByAgentIDs is a system function.
|
|
|
|
logSources, err := api.Database.GetWorkspaceAgentLogSourcesByAgentIDs(dbauthz.AsSystemRestricted(ctx), resourceAgentIDs)
|
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
|
|
err = nil
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
|
|
Message: "Internal error fetching workspace agent log sources.",
|
|
|
|
Detail: err.Error(),
|
|
|
|
})
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2023-03-10 18:09:28 +00:00
|
|
|
// nolint:gocritic // GetWorkspaceResourceMetadataByResourceIDs is a system function.
|
|
|
|
resourceMetadata, err := api.Database.GetWorkspaceResourceMetadataByResourceIDs(dbauthz.AsSystemRestricted(ctx), resourceIDs)
|
2022-08-01 21:53:05 +00:00
|
|
|
if err != nil {
|
2022-09-21 22:07:00 +00:00
|
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
2022-08-01 21:53:05 +00:00
|
|
|
Message: "Internal error fetching workspace metadata.",
|
|
|
|
Detail: err.Error(),
|
|
|
|
})
|
|
|
|
return
|
|
|
|
}
|
2022-04-11 21:06:15 +00:00
|
|
|
|
2022-03-22 19:17:50 +00:00
|
|
|
apiResources := make([]codersdk.WorkspaceResource, 0)
|
2022-03-07 17:40:54 +00:00
|
|
|
for _, resource := range resources {
|
2022-04-11 21:06:15 +00:00
|
|
|
agents := make([]codersdk.WorkspaceAgent, 0)
|
|
|
|
for _, agent := range resourceAgents {
|
|
|
|
if agent.ResourceID != resource.ID {
|
|
|
|
continue
|
|
|
|
}
|
2022-06-04 20:13:37 +00:00
|
|
|
dbApps := make([]database.WorkspaceApp, 0)
|
|
|
|
for _, app := range apps {
|
|
|
|
if app.AgentID == agent.ID {
|
|
|
|
dbApps = append(dbApps, app)
|
|
|
|
}
|
|
|
|
}
|
2023-09-25 21:47:17 +00:00
|
|
|
dbScripts := make([]database.WorkspaceAgentScript, 0)
|
|
|
|
for _, script := range scripts {
|
|
|
|
if script.WorkspaceAgentID == agent.ID {
|
|
|
|
dbScripts = append(dbScripts, script)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
dbLogSources := make([]database.WorkspaceAgentLogSource, 0)
|
|
|
|
for _, logSource := range logSources {
|
|
|
|
if logSource.WorkspaceAgentID == agent.ID {
|
|
|
|
dbLogSources = append(dbLogSources, logSource)
|
|
|
|
}
|
|
|
|
}
|
2022-06-04 20:13:37 +00:00
|
|
|
|
2023-03-07 21:10:01 +00:00
|
|
|
apiAgent, err := convertWorkspaceAgent(
|
2023-09-25 21:47:17 +00:00
|
|
|
api.DERPMap(), *api.TailnetCoordinator.Load(), agent, convertProvisionedApps(dbApps), convertScripts(dbScripts), convertLogSources(dbLogSources), api.AgentInactiveDisconnectTimeout,
|
2023-03-07 21:10:01 +00:00
|
|
|
api.DeploymentValues.AgentFallbackTroubleshootingURL.String(),
|
|
|
|
)
|
2022-04-11 21:06:15 +00:00
|
|
|
if err != nil {
|
2022-09-21 22:07:00 +00:00
|
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
2022-06-07 14:33:06 +00:00
|
|
|
Message: "Internal error reading job agent.",
|
2022-06-03 21:48:09 +00:00
|
|
|
Detail: err.Error(),
|
2022-04-11 21:06:15 +00:00
|
|
|
})
|
|
|
|
return
|
|
|
|
}
|
|
|
|
agents = append(agents, apiAgent)
|
2022-03-07 17:40:54 +00:00
|
|
|
}
|
2022-08-01 21:53:05 +00:00
|
|
|
metadata := make([]database.WorkspaceResourceMetadatum, 0)
|
|
|
|
for _, field := range resourceMetadata {
|
|
|
|
if field.WorkspaceResourceID == resource.ID {
|
|
|
|
metadata = append(metadata, field)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
apiResources = append(apiResources, convertWorkspaceResource(resource, agents, metadata))
|
2022-03-07 17:40:54 +00:00
|
|
|
}
|
2022-09-08 03:16:26 +00:00
|
|
|
sort.Slice(apiResources, func(i, j int) bool {
|
|
|
|
return apiResources[i].Name < apiResources[j].Name
|
|
|
|
})
|
2022-04-12 15:17:33 +00:00
|
|
|
|
2022-09-21 22:07:00 +00:00
|
|
|
httpapi.Write(ctx, rw, http.StatusOK, apiResources)
|
2022-03-07 17:40:54 +00:00
|
|
|
}
|
|
|
|
|
2022-05-18 20:34:00 +00:00
|
|
|
func convertProvisionerJobLogs(provisionerJobLogs []database.ProvisionerJobLog) []codersdk.ProvisionerJobLog {
|
|
|
|
sdk := make([]codersdk.ProvisionerJobLog, 0, len(provisionerJobLogs))
|
|
|
|
for _, log := range provisionerJobLogs {
|
|
|
|
sdk = append(sdk, convertProvisionerJobLog(log))
|
|
|
|
}
|
|
|
|
return sdk
|
|
|
|
}
|
|
|
|
|
2022-03-22 19:17:50 +00:00
|
|
|
func convertProvisionerJobLog(provisionerJobLog database.ProvisionerJobLog) codersdk.ProvisionerJobLog {
|
|
|
|
return codersdk.ProvisionerJobLog{
|
2022-02-12 19:34:04 +00:00
|
|
|
ID: provisionerJobLog.ID,
|
|
|
|
CreatedAt: provisionerJobLog.CreatedAt,
|
2022-05-19 18:04:44 +00:00
|
|
|
Source: codersdk.LogSource(provisionerJobLog.Source),
|
|
|
|
Level: codersdk.LogLevel(provisionerJobLog.Level),
|
2022-03-28 18:43:22 +00:00
|
|
|
Stage: provisionerJobLog.Stage,
|
2022-02-12 19:34:04 +00:00
|
|
|
Output: provisionerJobLog.Output,
|
|
|
|
}
|
2022-02-08 18:00:44 +00:00
|
|
|
}
|
|
|
|
|
2023-06-20 20:07:18 +00:00
|
|
|
func convertProvisionerJob(pj database.GetProvisionerJobsByIDsWithQueuePositionRow) codersdk.ProvisionerJob {
|
|
|
|
provisionerJob := pj.ProvisionerJob
|
2022-03-22 19:17:50 +00:00
|
|
|
job := codersdk.ProvisionerJob{
|
2023-06-20 20:07:18 +00:00
|
|
|
ID: provisionerJob.ID,
|
|
|
|
CreatedAt: provisionerJob.CreatedAt,
|
|
|
|
Error: provisionerJob.Error.String,
|
|
|
|
ErrorCode: codersdk.JobErrorCode(provisionerJob.ErrorCode.String),
|
|
|
|
FileID: provisionerJob.FileID,
|
|
|
|
Tags: provisionerJob.Tags,
|
|
|
|
QueuePosition: int(pj.QueuePosition),
|
|
|
|
QueueSize: int(pj.QueueSize),
|
feat: Add provisionerdaemon to coderd (#141)
* feat: Add history middleware parameters
These will be used for streaming logs, checking status,
and other operations related to workspace and project
history.
* refactor: Move all HTTP routes to top-level struct
Nesting all structs behind their respective structures
is leaky, and promotes naming conflicts between handlers.
Our HTTP routes cannot have conflicts, so neither should
function naming.
* Add provisioner daemon routes
* Add periodic updates
* Skip pubsub if short
* Return jobs with WorkspaceHistory
* Add endpoints for extracting singular history
* The full end-to-end operation works
* fix: Disable compression for websocket dRPC transport (#145)
There is a race condition in the interop between the websocket and `dRPC`: https://github.com/coder/coder/runs/5038545709?check_suite_focus=true#step:7:117 - it seems both the websocket and dRPC feel like they own the `byte[]` being sent between them. This can lead to data races, in which both `dRPC` and the websocket are writing.
This is just tracking some experimentation to fix that race condition
## Run results: ##
- Run 1: peer test failure
- Run 2: peer test failure
- Run 3: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040858460?check_suite_focus=true#step:8:45
```
status code 412: The provided project history is running. Wait for it to complete importing!`
```
- Run 4: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040957999?check_suite_focus=true#step:7:176
```
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
- Run 5: peer failure
- Run 6: Pass ✅
- Run 7: Peer failure
## Open Questions: ##
### Is `dRPC` or `websocket` at fault for the data race?
It looks like this condition is specifically happening when `dRPC` decides to [`SendError`]). This constructs a new byte payload from [`MarshalError`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/error.go#L15) - so `dRPC` has created this buffer and owns it.
From `dRPC`'s perspective, the callstack looks like this:
- [`sendPacket`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcstream/stream.go#L253)
- [`writeFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L65)
- [`AppendFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/packet.go#L128)
- with finally the data race happening here:
```go
// AppendFrame appends a marshaled form of the frame to the provided buffer.
func AppendFrame(buf []byte, fr Frame) []byte {
...
out := buf
out = append(out, control). // <---------
```
This should be fine, since `dPRC` create this buffer, and is taking the byte buffer constructed from `MarshalError` and tacking a bunch of headers on it to create a proper frame.
Once `dRPC` is done writing, it _hangs onto the buffer and resets it here__: https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L73
However... the websocket implementation, once it gets the buffer, it runs a `statelessDeflate` [here](https://github.com/nhooyr/websocket/blob/8dee580a7f74cf1713400307b4eee514b927870f/write.go#L180), which compresses the buffer on the fly. This functionality actually [mutates the buffer in place](https://github.com/klauspost/compress/blob/a1a9cfc821f00faf2f5231beaa96244344d50391/flate/stateless.go#L94), which is where get our race.
In the case where the `byte[]` aren't being manipulated anywhere else, this compress-in-place operation would be safe, and that's probably the case for most over-the-wire usages. In this case, though, where we're plumbing `dRPC` -> websocket, they both are manipulating it (`dRPC` is reusing the buffer for the next `write`, and `websocket` is compressing on the fly).
### Why does cloning on `Read` fail?
Get a bunch of errors like:
```
2022/02/02 19:26:10 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
```
# UPDATE:
We decided we could disable websocket compression, which would avoid the race because the in-place `deflate` operaton would no longer be run. Trying that out now:
- Run 1: ✅
- Run 2: https://github.com/coder/coder/runs/5042645522?check_suite_focus=true#step:8:338
- Run 3: ✅
- Run 4: https://github.com/coder/coder/runs/5042988758?check_suite_focus=true#step:7:168
- Run 5: ✅
* fix: Remove race condition with acquiredJobDone channel (#148)
Found another data race while running the tests: https://github.com/coder/coder/runs/5044320845?check_suite_focus=true#step:7:83
__Issue:__ There is a race in the p.acquiredJobDone chan - in particular, there can be a case where we're waiting on the channel to finish (in close) with <-p.acquiredJobDone, but in parallel, an acquireJob could've been started, which would create a new channel for p.acquiredJobDone. There is a similar race in `close(..)`ing the channel, which also came up in test runs.
__Fix:__ Instead of recreating the channel everytime, we can use `sync.WaitGroup` to accomplish the same functionality - a semaphore to make close wait for the current job to wrap up.
* fix: Bump up workspace history timeout (#149)
This is an attempted fix for failures like: https://github.com/coder/coder/runs/5043435263?check_suite_focus=true#step:7:32
Looking at the timing of the test:
```
t.go:56: 2022-02-02 21:33:21.964 [DEBUG] (terraform-provisioner) <provision.go:139> ran apply
t.go:56: 2022-02-02 21:33:21.991 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.050 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.090 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.140 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.195 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.240 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
It appears that the `terraform apply` job had just finished - with less than a second to spare until our `require.Eventually` completes - but there's still work to be done (ie, collecting the state files). So my suspicion is that terraform might, in some cases, exceed our 5s timeout.
Note that in the setup for this test - there is a similar project history wait that waits for 15s, so I borrowed that here.
In the future - we can look at potentially using a simple echo provider to exercise this in the unit test, in a way that is more reliable in terms of timing. I'll log an issue to track that.
Co-authored-by: Bryan <bryan@coder.com>
2022-02-03 20:34:50 +00:00
|
|
|
}
|
|
|
|
// Applying values optional to the struct.
|
|
|
|
if provisionerJob.StartedAt.Valid {
|
|
|
|
job.StartedAt = &provisionerJob.StartedAt.Time
|
|
|
|
}
|
|
|
|
if provisionerJob.CompletedAt.Valid {
|
|
|
|
job.CompletedAt = &provisionerJob.CompletedAt.Time
|
|
|
|
}
|
2022-10-11 17:50:41 +00:00
|
|
|
if provisionerJob.CanceledAt.Valid {
|
|
|
|
job.CanceledAt = &provisionerJob.CanceledAt.Time
|
|
|
|
}
|
feat: Add provisionerdaemon to coderd (#141)
* feat: Add history middleware parameters
These will be used for streaming logs, checking status,
and other operations related to workspace and project
history.
* refactor: Move all HTTP routes to top-level struct
Nesting all structs behind their respective structures
is leaky, and promotes naming conflicts between handlers.
Our HTTP routes cannot have conflicts, so neither should
function naming.
* Add provisioner daemon routes
* Add periodic updates
* Skip pubsub if short
* Return jobs with WorkspaceHistory
* Add endpoints for extracting singular history
* The full end-to-end operation works
* fix: Disable compression for websocket dRPC transport (#145)
There is a race condition in the interop between the websocket and `dRPC`: https://github.com/coder/coder/runs/5038545709?check_suite_focus=true#step:7:117 - it seems both the websocket and dRPC feel like they own the `byte[]` being sent between them. This can lead to data races, in which both `dRPC` and the websocket are writing.
This is just tracking some experimentation to fix that race condition
## Run results: ##
- Run 1: peer test failure
- Run 2: peer test failure
- Run 3: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040858460?check_suite_focus=true#step:8:45
```
status code 412: The provided project history is running. Wait for it to complete importing!`
```
- Run 4: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040957999?check_suite_focus=true#step:7:176
```
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
- Run 5: peer failure
- Run 6: Pass ✅
- Run 7: Peer failure
## Open Questions: ##
### Is `dRPC` or `websocket` at fault for the data race?
It looks like this condition is specifically happening when `dRPC` decides to [`SendError`]). This constructs a new byte payload from [`MarshalError`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/error.go#L15) - so `dRPC` has created this buffer and owns it.
From `dRPC`'s perspective, the callstack looks like this:
- [`sendPacket`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcstream/stream.go#L253)
- [`writeFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L65)
- [`AppendFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/packet.go#L128)
- with finally the data race happening here:
```go
// AppendFrame appends a marshaled form of the frame to the provided buffer.
func AppendFrame(buf []byte, fr Frame) []byte {
...
out := buf
out = append(out, control). // <---------
```
This should be fine, since `dPRC` create this buffer, and is taking the byte buffer constructed from `MarshalError` and tacking a bunch of headers on it to create a proper frame.
Once `dRPC` is done writing, it _hangs onto the buffer and resets it here__: https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L73
However... the websocket implementation, once it gets the buffer, it runs a `statelessDeflate` [here](https://github.com/nhooyr/websocket/blob/8dee580a7f74cf1713400307b4eee514b927870f/write.go#L180), which compresses the buffer on the fly. This functionality actually [mutates the buffer in place](https://github.com/klauspost/compress/blob/a1a9cfc821f00faf2f5231beaa96244344d50391/flate/stateless.go#L94), which is where get our race.
In the case where the `byte[]` aren't being manipulated anywhere else, this compress-in-place operation would be safe, and that's probably the case for most over-the-wire usages. In this case, though, where we're plumbing `dRPC` -> websocket, they both are manipulating it (`dRPC` is reusing the buffer for the next `write`, and `websocket` is compressing on the fly).
### Why does cloning on `Read` fail?
Get a bunch of errors like:
```
2022/02/02 19:26:10 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
```
# UPDATE:
We decided we could disable websocket compression, which would avoid the race because the in-place `deflate` operaton would no longer be run. Trying that out now:
- Run 1: ✅
- Run 2: https://github.com/coder/coder/runs/5042645522?check_suite_focus=true#step:8:338
- Run 3: ✅
- Run 4: https://github.com/coder/coder/runs/5042988758?check_suite_focus=true#step:7:168
- Run 5: ✅
* fix: Remove race condition with acquiredJobDone channel (#148)
Found another data race while running the tests: https://github.com/coder/coder/runs/5044320845?check_suite_focus=true#step:7:83
__Issue:__ There is a race in the p.acquiredJobDone chan - in particular, there can be a case where we're waiting on the channel to finish (in close) with <-p.acquiredJobDone, but in parallel, an acquireJob could've been started, which would create a new channel for p.acquiredJobDone. There is a similar race in `close(..)`ing the channel, which also came up in test runs.
__Fix:__ Instead of recreating the channel everytime, we can use `sync.WaitGroup` to accomplish the same functionality - a semaphore to make close wait for the current job to wrap up.
* fix: Bump up workspace history timeout (#149)
This is an attempted fix for failures like: https://github.com/coder/coder/runs/5043435263?check_suite_focus=true#step:7:32
Looking at the timing of the test:
```
t.go:56: 2022-02-02 21:33:21.964 [DEBUG] (terraform-provisioner) <provision.go:139> ran apply
t.go:56: 2022-02-02 21:33:21.991 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.050 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.090 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.140 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.195 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.240 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
It appears that the `terraform apply` job had just finished - with less than a second to spare until our `require.Eventually` completes - but there's still work to be done (ie, collecting the state files). So my suspicion is that terraform might, in some cases, exceed our 5s timeout.
Note that in the setup for this test - there is a similar project history wait that waits for 15s, so I borrowed that here.
In the future - we can look at potentially using a simple echo provider to exercise this in the unit test, in a way that is more reliable in terms of timing. I'll log an issue to track that.
Co-authored-by: Bryan <bryan@coder.com>
2022-02-03 20:34:50 +00:00
|
|
|
if provisionerJob.WorkerID.Valid {
|
|
|
|
job.WorkerID = &provisionerJob.WorkerID.UUID
|
|
|
|
}
|
2023-10-05 01:57:46 +00:00
|
|
|
job.Status = codersdk.ProvisionerJobStatus(pj.ProvisionerJob.JobStatus)
|
feat: Add provisionerdaemon to coderd (#141)
* feat: Add history middleware parameters
These will be used for streaming logs, checking status,
and other operations related to workspace and project
history.
* refactor: Move all HTTP routes to top-level struct
Nesting all structs behind their respective structures
is leaky, and promotes naming conflicts between handlers.
Our HTTP routes cannot have conflicts, so neither should
function naming.
* Add provisioner daemon routes
* Add periodic updates
* Skip pubsub if short
* Return jobs with WorkspaceHistory
* Add endpoints for extracting singular history
* The full end-to-end operation works
* fix: Disable compression for websocket dRPC transport (#145)
There is a race condition in the interop between the websocket and `dRPC`: https://github.com/coder/coder/runs/5038545709?check_suite_focus=true#step:7:117 - it seems both the websocket and dRPC feel like they own the `byte[]` being sent between them. This can lead to data races, in which both `dRPC` and the websocket are writing.
This is just tracking some experimentation to fix that race condition
## Run results: ##
- Run 1: peer test failure
- Run 2: peer test failure
- Run 3: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040858460?check_suite_focus=true#step:8:45
```
status code 412: The provided project history is running. Wait for it to complete importing!`
```
- Run 4: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040957999?check_suite_focus=true#step:7:176
```
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
- Run 5: peer failure
- Run 6: Pass ✅
- Run 7: Peer failure
## Open Questions: ##
### Is `dRPC` or `websocket` at fault for the data race?
It looks like this condition is specifically happening when `dRPC` decides to [`SendError`]). This constructs a new byte payload from [`MarshalError`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/error.go#L15) - so `dRPC` has created this buffer and owns it.
From `dRPC`'s perspective, the callstack looks like this:
- [`sendPacket`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcstream/stream.go#L253)
- [`writeFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L65)
- [`AppendFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/packet.go#L128)
- with finally the data race happening here:
```go
// AppendFrame appends a marshaled form of the frame to the provided buffer.
func AppendFrame(buf []byte, fr Frame) []byte {
...
out := buf
out = append(out, control). // <---------
```
This should be fine, since `dPRC` create this buffer, and is taking the byte buffer constructed from `MarshalError` and tacking a bunch of headers on it to create a proper frame.
Once `dRPC` is done writing, it _hangs onto the buffer and resets it here__: https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L73
However... the websocket implementation, once it gets the buffer, it runs a `statelessDeflate` [here](https://github.com/nhooyr/websocket/blob/8dee580a7f74cf1713400307b4eee514b927870f/write.go#L180), which compresses the buffer on the fly. This functionality actually [mutates the buffer in place](https://github.com/klauspost/compress/blob/a1a9cfc821f00faf2f5231beaa96244344d50391/flate/stateless.go#L94), which is where get our race.
In the case where the `byte[]` aren't being manipulated anywhere else, this compress-in-place operation would be safe, and that's probably the case for most over-the-wire usages. In this case, though, where we're plumbing `dRPC` -> websocket, they both are manipulating it (`dRPC` is reusing the buffer for the next `write`, and `websocket` is compressing on the fly).
### Why does cloning on `Read` fail?
Get a bunch of errors like:
```
2022/02/02 19:26:10 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
```
# UPDATE:
We decided we could disable websocket compression, which would avoid the race because the in-place `deflate` operaton would no longer be run. Trying that out now:
- Run 1: ✅
- Run 2: https://github.com/coder/coder/runs/5042645522?check_suite_focus=true#step:8:338
- Run 3: ✅
- Run 4: https://github.com/coder/coder/runs/5042988758?check_suite_focus=true#step:7:168
- Run 5: ✅
* fix: Remove race condition with acquiredJobDone channel (#148)
Found another data race while running the tests: https://github.com/coder/coder/runs/5044320845?check_suite_focus=true#step:7:83
__Issue:__ There is a race in the p.acquiredJobDone chan - in particular, there can be a case where we're waiting on the channel to finish (in close) with <-p.acquiredJobDone, but in parallel, an acquireJob could've been started, which would create a new channel for p.acquiredJobDone. There is a similar race in `close(..)`ing the channel, which also came up in test runs.
__Fix:__ Instead of recreating the channel everytime, we can use `sync.WaitGroup` to accomplish the same functionality - a semaphore to make close wait for the current job to wrap up.
* fix: Bump up workspace history timeout (#149)
This is an attempted fix for failures like: https://github.com/coder/coder/runs/5043435263?check_suite_focus=true#step:7:32
Looking at the timing of the test:
```
t.go:56: 2022-02-02 21:33:21.964 [DEBUG] (terraform-provisioner) <provision.go:139> ran apply
t.go:56: 2022-02-02 21:33:21.991 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.050 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.090 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.140 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.195 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.240 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
It appears that the `terraform apply` job had just finished - with less than a second to spare until our `require.Eventually` completes - but there's still work to be done (ie, collecting the state files). So my suspicion is that terraform might, in some cases, exceed our 5s timeout.
Note that in the setup for this test - there is a similar project history wait that waits for 15s, so I borrowed that here.
In the future - we can look at potentially using a simple echo provider to exercise this in the unit test, in a way that is more reliable in terms of timing. I'll log an issue to track that.
Co-authored-by: Bryan <bryan@coder.com>
2022-02-03 20:34:50 +00:00
|
|
|
|
2022-08-09 01:08:42 +00:00
|
|
|
return job
|
|
|
|
}
|
|
|
|
|
2023-06-20 10:30:45 +00:00
|
|
|
func fetchAndWriteLogs(ctx context.Context, db database.Store, jobID uuid.UUID, after int64, rw http.ResponseWriter) {
|
2023-05-31 06:15:58 +00:00
|
|
|
logs, err := db.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{
|
|
|
|
JobID: jobID,
|
|
|
|
CreatedAfter: after,
|
|
|
|
})
|
|
|
|
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
|
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
|
|
Message: "Internal error fetching provisioner logs.",
|
|
|
|
Detail: err.Error(),
|
|
|
|
})
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if logs == nil {
|
|
|
|
logs = []database.ProvisionerJobLog{}
|
|
|
|
}
|
|
|
|
httpapi.Write(ctx, rw, http.StatusOK, convertProvisionerJobLogs(logs))
|
2022-02-07 21:32:37 +00:00
|
|
|
}
|
2022-07-01 21:07:18 +00:00
|
|
|
|
2023-05-31 06:15:58 +00:00
|
|
|
func jobIsComplete(logger slog.Logger, job database.ProvisionerJob) bool {
|
2023-10-05 01:57:46 +00:00
|
|
|
status := codersdk.ProvisionerJobStatus(job.JobStatus)
|
2023-05-31 06:15:58 +00:00
|
|
|
switch status {
|
|
|
|
case codersdk.ProvisionerJobCanceled:
|
|
|
|
return true
|
|
|
|
case codersdk.ProvisionerJobFailed:
|
|
|
|
return true
|
|
|
|
case codersdk.ProvisionerJobSucceeded:
|
|
|
|
return true
|
|
|
|
case codersdk.ProvisionerJobPending:
|
|
|
|
return false
|
|
|
|
case codersdk.ProvisionerJobCanceling:
|
|
|
|
return false
|
|
|
|
case codersdk.ProvisionerJobRunning:
|
|
|
|
return false
|
|
|
|
default:
|
|
|
|
logger.Error(context.Background(),
|
2023-06-20 10:30:45 +00:00
|
|
|
"can't convert the provisioner job status",
|
2023-05-31 06:15:58 +00:00
|
|
|
slog.F("job_id", job.ID), slog.F("status", status))
|
|
|
|
return false
|
|
|
|
}
|
2022-07-01 21:07:18 +00:00
|
|
|
}
|
|
|
|
|
2023-05-31 06:15:58 +00:00
|
|
|
type logFollower struct {
|
|
|
|
ctx context.Context
|
|
|
|
logger slog.Logger
|
|
|
|
db database.Store
|
2023-06-14 15:34:54 +00:00
|
|
|
pubsub pubsub.Pubsub
|
2023-05-31 06:15:58 +00:00
|
|
|
r *http.Request
|
|
|
|
rw http.ResponseWriter
|
|
|
|
conn *websocket.Conn
|
|
|
|
|
|
|
|
jobID uuid.UUID
|
|
|
|
after int64
|
|
|
|
complete bool
|
|
|
|
notifications chan provisionersdk.ProvisionerJobLogsNotifyMessage
|
|
|
|
errors chan error
|
|
|
|
}
|
2022-11-02 18:30:00 +00:00
|
|
|
|
2023-05-31 06:15:58 +00:00
|
|
|
func newLogFollower(
|
2023-06-14 15:34:54 +00:00
|
|
|
ctx context.Context, logger slog.Logger, db database.Store, ps pubsub.Pubsub,
|
2023-05-31 06:15:58 +00:00
|
|
|
rw http.ResponseWriter, r *http.Request, job database.ProvisionerJob, after int64,
|
|
|
|
) *logFollower {
|
|
|
|
return &logFollower{
|
|
|
|
ctx: ctx,
|
|
|
|
logger: logger,
|
|
|
|
db: db,
|
2023-06-14 15:34:54 +00:00
|
|
|
pubsub: ps,
|
2023-05-31 06:15:58 +00:00
|
|
|
r: r,
|
|
|
|
rw: rw,
|
|
|
|
jobID: job.ID,
|
|
|
|
after: after,
|
|
|
|
complete: jobIsComplete(logger, job),
|
|
|
|
notifications: make(chan provisionersdk.ProvisionerJobLogsNotifyMessage),
|
|
|
|
errors: make(chan error),
|
|
|
|
}
|
|
|
|
}
|
2023-03-08 15:13:08 +00:00
|
|
|
|
2023-05-31 06:15:58 +00:00
|
|
|
func (f *logFollower) follow() {
|
2023-06-02 16:25:46 +00:00
|
|
|
var cancel context.CancelFunc
|
|
|
|
f.ctx, cancel = context.WithCancel(f.ctx)
|
|
|
|
defer cancel()
|
2023-05-31 06:15:58 +00:00
|
|
|
// note that we only need to subscribe to updates if the job is not yet
|
|
|
|
// complete.
|
|
|
|
if !f.complete {
|
|
|
|
subCancel, err := f.pubsub.SubscribeWithErr(
|
|
|
|
provisionersdk.ProvisionerJobLogsNotifyChannel(f.jobID),
|
|
|
|
f.listener,
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
httpapi.Write(f.ctx, f.rw, http.StatusInternalServerError, codersdk.Response{
|
|
|
|
Message: "failed to subscribe to job updates",
|
|
|
|
Detail: err.Error(),
|
|
|
|
})
|
|
|
|
return
|
2023-03-08 15:13:08 +00:00
|
|
|
}
|
2023-05-31 06:15:58 +00:00
|
|
|
defer subCancel()
|
2023-06-12 18:38:46 +00:00
|
|
|
// Move cancel up the stack so it happens before unsubscribing,
|
|
|
|
// otherwise we can end up in a deadlock due to how the
|
|
|
|
// in-memory pubsub does mutex locking on send/unsubscribe.
|
|
|
|
defer cancel()
|
2023-05-31 06:15:58 +00:00
|
|
|
|
|
|
|
// we were provided `complete` prior to starting this subscription, so
|
|
|
|
// we also need to check whether the job is now complete, in case the
|
|
|
|
// job completed between the last time we queried the job and the start
|
|
|
|
// of the subscription. If the job completes after this, we will get
|
|
|
|
// a notification on the subscription.
|
|
|
|
job, err := f.db.GetProvisionerJobByID(f.ctx, f.jobID)
|
|
|
|
if err != nil {
|
|
|
|
httpapi.Write(f.ctx, f.rw, http.StatusInternalServerError, codersdk.Response{
|
|
|
|
Message: "failed to query job",
|
|
|
|
Detail: err.Error(),
|
|
|
|
})
|
|
|
|
return
|
|
|
|
}
|
|
|
|
f.complete = jobIsComplete(f.logger, job)
|
|
|
|
f.logger.Debug(f.ctx, "queried job after subscribe", slog.F("complete", f.complete))
|
2023-03-08 15:13:08 +00:00
|
|
|
}
|
|
|
|
|
2023-05-31 06:15:58 +00:00
|
|
|
var err error
|
|
|
|
f.conn, err = websocket.Accept(f.rw, f.r, nil)
|
|
|
|
if err != nil {
|
|
|
|
httpapi.Write(f.ctx, f.rw, http.StatusBadRequest, codersdk.Response{
|
|
|
|
Message: "Failed to accept websocket.",
|
|
|
|
Detail: err.Error(),
|
|
|
|
})
|
|
|
|
return
|
|
|
|
}
|
|
|
|
defer f.conn.Close(websocket.StatusNormalClosure, "done")
|
|
|
|
go httpapi.Heartbeat(f.ctx, f.conn)
|
|
|
|
|
|
|
|
// query for logs once right away, so we can get historical data from before
|
|
|
|
// subscription
|
|
|
|
if err := f.query(); err != nil {
|
|
|
|
if f.ctx.Err() == nil && !xerrors.Is(err, io.EOF) {
|
|
|
|
// neither context expiry, nor EOF, close and log
|
|
|
|
f.logger.Error(f.ctx, "failed to query logs", slog.Error(err))
|
|
|
|
err = f.conn.Close(websocket.StatusInternalError, err.Error())
|
|
|
|
if err != nil {
|
|
|
|
f.logger.Warn(f.ctx, "failed to close webscoket", slog.Error(err))
|
2022-11-02 18:30:00 +00:00
|
|
|
}
|
2023-05-31 06:15:58 +00:00
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// no need to wait if the job is done
|
|
|
|
if f.complete {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case err := <-f.errors:
|
|
|
|
// we've dropped at least one notification. This can happen if we
|
|
|
|
// lose database connectivity. We don't know whether the job is
|
|
|
|
// now complete since we could have missed the end of logs message.
|
|
|
|
// We could soldier on and retry, but loss of database connectivity
|
|
|
|
// is fairly serious, so instead just 500 and bail out. Client
|
|
|
|
// can retry and hopefully find a healthier node.
|
|
|
|
f.logger.Error(f.ctx, "dropped or corrupted notification", slog.Error(err))
|
|
|
|
err = f.conn.Close(websocket.StatusInternalError, err.Error())
|
2022-07-01 21:07:18 +00:00
|
|
|
if err != nil {
|
2023-05-31 06:15:58 +00:00
|
|
|
f.logger.Warn(f.ctx, "failed to close webscoket", slog.Error(err))
|
|
|
|
}
|
|
|
|
return
|
|
|
|
case <-f.ctx.Done():
|
|
|
|
// client disconnect
|
|
|
|
return
|
|
|
|
case n := <-f.notifications:
|
|
|
|
if n.EndOfLogs {
|
|
|
|
// safe to return here because we started the subscription,
|
|
|
|
// and then queried at least once, so we will have already
|
|
|
|
// gotten all logs prior to the start of our subscription.
|
2022-07-01 21:07:18 +00:00
|
|
|
return
|
|
|
|
}
|
2023-05-31 06:15:58 +00:00
|
|
|
err = f.query()
|
|
|
|
if err != nil {
|
|
|
|
if f.ctx.Err() == nil && !xerrors.Is(err, io.EOF) {
|
|
|
|
// neither context expiry, nor EOF, close and log
|
|
|
|
f.logger.Error(f.ctx, "failed to query logs", slog.Error(err))
|
2023-08-27 16:26:31 +00:00
|
|
|
err = f.conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("%s", err.Error()))
|
2023-05-31 06:15:58 +00:00
|
|
|
if err != nil {
|
|
|
|
f.logger.Warn(f.ctx, "failed to close webscoket", slog.Error(err))
|
2022-11-09 18:32:59 +00:00
|
|
|
}
|
2022-07-01 21:07:18 +00:00
|
|
|
}
|
2023-05-31 06:15:58 +00:00
|
|
|
return
|
2022-07-01 21:07:18 +00:00
|
|
|
}
|
2023-05-31 06:15:58 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-11-09 18:32:59 +00:00
|
|
|
|
2023-05-31 06:15:58 +00:00
|
|
|
func (f *logFollower) listener(_ context.Context, message []byte, err error) {
|
2023-06-02 16:25:46 +00:00
|
|
|
// in this function we always pair writes to channels with a select on the context
|
|
|
|
// otherwise we could block a goroutine if the follow() method exits.
|
2022-07-01 21:07:18 +00:00
|
|
|
if err != nil {
|
2023-06-02 16:25:46 +00:00
|
|
|
select {
|
|
|
|
case <-f.ctx.Done():
|
|
|
|
case f.errors <- err:
|
|
|
|
}
|
2023-05-31 06:15:58 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
var n provisionersdk.ProvisionerJobLogsNotifyMessage
|
|
|
|
err = json.Unmarshal(message, &n)
|
|
|
|
if err != nil {
|
2023-06-02 16:25:46 +00:00
|
|
|
select {
|
|
|
|
case <-f.ctx.Done():
|
|
|
|
case f.errors <- err:
|
|
|
|
}
|
2023-05-31 06:15:58 +00:00
|
|
|
return
|
|
|
|
}
|
2023-06-02 16:25:46 +00:00
|
|
|
select {
|
|
|
|
case <-f.ctx.Done():
|
|
|
|
case f.notifications <- n:
|
|
|
|
}
|
2023-05-31 06:15:58 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// query fetches the latest job logs from the database and writes them to the
|
|
|
|
// connection.
|
|
|
|
func (f *logFollower) query() error {
|
|
|
|
f.logger.Debug(f.ctx, "querying logs", slog.F("after", f.after))
|
|
|
|
logs, err := f.db.GetProvisionerLogsAfterID(f.ctx, database.GetProvisionerLogsAfterIDParams{
|
|
|
|
JobID: f.jobID,
|
|
|
|
CreatedAfter: f.after,
|
|
|
|
})
|
|
|
|
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
|
|
|
return xerrors.Errorf("error fetching logs: %w", err)
|
|
|
|
}
|
|
|
|
for _, log := range logs {
|
|
|
|
logB, err := json.Marshal(convertProvisionerJobLog(log))
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("error marshaling log: %w", err)
|
|
|
|
}
|
|
|
|
err = f.conn.Write(f.ctx, websocket.MessageText, logB)
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("error writing to websocket: %w", err)
|
|
|
|
}
|
|
|
|
f.after = log.ID
|
|
|
|
f.logger.Debug(f.ctx, "wrote log to websocket", slog.F("id", log.ID))
|
2022-07-01 21:07:18 +00:00
|
|
|
}
|
2023-05-31 06:15:58 +00:00
|
|
|
return nil
|
2022-07-01 21:07:18 +00:00
|
|
|
}
|