2022-02-01 18:15:54 +00:00
|
|
|
package provisionerd
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
2023-12-18 16:44:52 +00:00
|
|
|
"net/http"
|
2022-09-16 16:43:22 +00:00
|
|
|
"reflect"
|
2022-02-01 18:15:54 +00:00
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
feat: Add provisionerdaemon to coderd (#141)
* feat: Add history middleware parameters
These will be used for streaming logs, checking status,
and other operations related to workspace and project
history.
* refactor: Move all HTTP routes to top-level struct
Nesting all structs behind their respective structures
is leaky, and promotes naming conflicts between handlers.
Our HTTP routes cannot have conflicts, so neither should
function naming.
* Add provisioner daemon routes
* Add periodic updates
* Skip pubsub if short
* Return jobs with WorkspaceHistory
* Add endpoints for extracting singular history
* The full end-to-end operation works
* fix: Disable compression for websocket dRPC transport (#145)
There is a race condition in the interop between the websocket and `dRPC`: https://github.com/coder/coder/runs/5038545709?check_suite_focus=true#step:7:117 - it seems both the websocket and dRPC feel like they own the `byte[]` being sent between them. This can lead to data races, in which both `dRPC` and the websocket are writing.
This is just tracking some experimentation to fix that race condition
## Run results: ##
- Run 1: peer test failure
- Run 2: peer test failure
- Run 3: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040858460?check_suite_focus=true#step:8:45
```
status code 412: The provided project history is running. Wait for it to complete importing!`
```
- Run 4: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040957999?check_suite_focus=true#step:7:176
```
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
- Run 5: peer failure
- Run 6: Pass ✅
- Run 7: Peer failure
## Open Questions: ##
### Is `dRPC` or `websocket` at fault for the data race?
It looks like this condition is specifically happening when `dRPC` decides to [`SendError`]). This constructs a new byte payload from [`MarshalError`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/error.go#L15) - so `dRPC` has created this buffer and owns it.
From `dRPC`'s perspective, the callstack looks like this:
- [`sendPacket`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcstream/stream.go#L253)
- [`writeFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L65)
- [`AppendFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/packet.go#L128)
- with finally the data race happening here:
```go
// AppendFrame appends a marshaled form of the frame to the provided buffer.
func AppendFrame(buf []byte, fr Frame) []byte {
...
out := buf
out = append(out, control). // <---------
```
This should be fine, since `dPRC` create this buffer, and is taking the byte buffer constructed from `MarshalError` and tacking a bunch of headers on it to create a proper frame.
Once `dRPC` is done writing, it _hangs onto the buffer and resets it here__: https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L73
However... the websocket implementation, once it gets the buffer, it runs a `statelessDeflate` [here](https://github.com/nhooyr/websocket/blob/8dee580a7f74cf1713400307b4eee514b927870f/write.go#L180), which compresses the buffer on the fly. This functionality actually [mutates the buffer in place](https://github.com/klauspost/compress/blob/a1a9cfc821f00faf2f5231beaa96244344d50391/flate/stateless.go#L94), which is where get our race.
In the case where the `byte[]` aren't being manipulated anywhere else, this compress-in-place operation would be safe, and that's probably the case for most over-the-wire usages. In this case, though, where we're plumbing `dRPC` -> websocket, they both are manipulating it (`dRPC` is reusing the buffer for the next `write`, and `websocket` is compressing on the fly).
### Why does cloning on `Read` fail?
Get a bunch of errors like:
```
2022/02/02 19:26:10 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
```
# UPDATE:
We decided we could disable websocket compression, which would avoid the race because the in-place `deflate` operaton would no longer be run. Trying that out now:
- Run 1: ✅
- Run 2: https://github.com/coder/coder/runs/5042645522?check_suite_focus=true#step:8:338
- Run 3: ✅
- Run 4: https://github.com/coder/coder/runs/5042988758?check_suite_focus=true#step:7:168
- Run 5: ✅
* fix: Remove race condition with acquiredJobDone channel (#148)
Found another data race while running the tests: https://github.com/coder/coder/runs/5044320845?check_suite_focus=true#step:7:83
__Issue:__ There is a race in the p.acquiredJobDone chan - in particular, there can be a case where we're waiting on the channel to finish (in close) with <-p.acquiredJobDone, but in parallel, an acquireJob could've been started, which would create a new channel for p.acquiredJobDone. There is a similar race in `close(..)`ing the channel, which also came up in test runs.
__Fix:__ Instead of recreating the channel everytime, we can use `sync.WaitGroup` to accomplish the same functionality - a semaphore to make close wait for the current job to wrap up.
* fix: Bump up workspace history timeout (#149)
This is an attempted fix for failures like: https://github.com/coder/coder/runs/5043435263?check_suite_focus=true#step:7:32
Looking at the timing of the test:
```
t.go:56: 2022-02-02 21:33:21.964 [DEBUG] (terraform-provisioner) <provision.go:139> ran apply
t.go:56: 2022-02-02 21:33:21.991 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.050 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.090 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.140 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.195 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.240 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
It appears that the `terraform apply` job had just finished - with less than a second to spare until our `require.Eventually` completes - but there's still work to be done (ie, collecting the state files). So my suspicion is that terraform might, in some cases, exceed our 5s timeout.
Note that in the setup for this test - there is a similar project history wait that waits for 15s, so I borrowed that here.
In the future - we can look at potentially using a simple echo provider to exercise this in the unit test, in a way that is more reliable in terms of timing. I'll log an issue to track that.
Co-authored-by: Bryan <bryan@coder.com>
2022-02-03 20:34:50 +00:00
|
|
|
"github.com/hashicorp/yamux"
|
2022-11-05 00:03:01 +00:00
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
2022-11-22 18:19:32 +00:00
|
|
|
"github.com/valyala/fasthttp/fasthttputil"
|
2022-09-16 16:43:22 +00:00
|
|
|
"go.opentelemetry.io/otel/attribute"
|
2023-04-03 05:31:39 +00:00
|
|
|
semconv "go.opentelemetry.io/otel/semconv/v1.14.0"
|
2022-09-16 16:43:22 +00:00
|
|
|
"go.opentelemetry.io/otel/trace"
|
2022-02-08 01:35:18 +00:00
|
|
|
"golang.org/x/xerrors"
|
feat: Add provisionerdaemon to coderd (#141)
* feat: Add history middleware parameters
These will be used for streaming logs, checking status,
and other operations related to workspace and project
history.
* refactor: Move all HTTP routes to top-level struct
Nesting all structs behind their respective structures
is leaky, and promotes naming conflicts between handlers.
Our HTTP routes cannot have conflicts, so neither should
function naming.
* Add provisioner daemon routes
* Add periodic updates
* Skip pubsub if short
* Return jobs with WorkspaceHistory
* Add endpoints for extracting singular history
* The full end-to-end operation works
* fix: Disable compression for websocket dRPC transport (#145)
There is a race condition in the interop between the websocket and `dRPC`: https://github.com/coder/coder/runs/5038545709?check_suite_focus=true#step:7:117 - it seems both the websocket and dRPC feel like they own the `byte[]` being sent between them. This can lead to data races, in which both `dRPC` and the websocket are writing.
This is just tracking some experimentation to fix that race condition
## Run results: ##
- Run 1: peer test failure
- Run 2: peer test failure
- Run 3: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040858460?check_suite_focus=true#step:8:45
```
status code 412: The provided project history is running. Wait for it to complete importing!`
```
- Run 4: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040957999?check_suite_focus=true#step:7:176
```
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
- Run 5: peer failure
- Run 6: Pass ✅
- Run 7: Peer failure
## Open Questions: ##
### Is `dRPC` or `websocket` at fault for the data race?
It looks like this condition is specifically happening when `dRPC` decides to [`SendError`]). This constructs a new byte payload from [`MarshalError`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/error.go#L15) - so `dRPC` has created this buffer and owns it.
From `dRPC`'s perspective, the callstack looks like this:
- [`sendPacket`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcstream/stream.go#L253)
- [`writeFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L65)
- [`AppendFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/packet.go#L128)
- with finally the data race happening here:
```go
// AppendFrame appends a marshaled form of the frame to the provided buffer.
func AppendFrame(buf []byte, fr Frame) []byte {
...
out := buf
out = append(out, control). // <---------
```
This should be fine, since `dPRC` create this buffer, and is taking the byte buffer constructed from `MarshalError` and tacking a bunch of headers on it to create a proper frame.
Once `dRPC` is done writing, it _hangs onto the buffer and resets it here__: https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L73
However... the websocket implementation, once it gets the buffer, it runs a `statelessDeflate` [here](https://github.com/nhooyr/websocket/blob/8dee580a7f74cf1713400307b4eee514b927870f/write.go#L180), which compresses the buffer on the fly. This functionality actually [mutates the buffer in place](https://github.com/klauspost/compress/blob/a1a9cfc821f00faf2f5231beaa96244344d50391/flate/stateless.go#L94), which is where get our race.
In the case where the `byte[]` aren't being manipulated anywhere else, this compress-in-place operation would be safe, and that's probably the case for most over-the-wire usages. In this case, though, where we're plumbing `dRPC` -> websocket, they both are manipulating it (`dRPC` is reusing the buffer for the next `write`, and `websocket` is compressing on the fly).
### Why does cloning on `Read` fail?
Get a bunch of errors like:
```
2022/02/02 19:26:10 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
```
# UPDATE:
We decided we could disable websocket compression, which would avoid the race because the in-place `deflate` operaton would no longer be run. Trying that out now:
- Run 1: ✅
- Run 2: https://github.com/coder/coder/runs/5042645522?check_suite_focus=true#step:8:338
- Run 3: ✅
- Run 4: https://github.com/coder/coder/runs/5042988758?check_suite_focus=true#step:7:168
- Run 5: ✅
* fix: Remove race condition with acquiredJobDone channel (#148)
Found another data race while running the tests: https://github.com/coder/coder/runs/5044320845?check_suite_focus=true#step:7:83
__Issue:__ There is a race in the p.acquiredJobDone chan - in particular, there can be a case where we're waiting on the channel to finish (in close) with <-p.acquiredJobDone, but in parallel, an acquireJob could've been started, which would create a new channel for p.acquiredJobDone. There is a similar race in `close(..)`ing the channel, which also came up in test runs.
__Fix:__ Instead of recreating the channel everytime, we can use `sync.WaitGroup` to accomplish the same functionality - a semaphore to make close wait for the current job to wrap up.
* fix: Bump up workspace history timeout (#149)
This is an attempted fix for failures like: https://github.com/coder/coder/runs/5043435263?check_suite_focus=true#step:7:32
Looking at the timing of the test:
```
t.go:56: 2022-02-02 21:33:21.964 [DEBUG] (terraform-provisioner) <provision.go:139> ran apply
t.go:56: 2022-02-02 21:33:21.991 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.050 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.090 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.140 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.195 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.240 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
It appears that the `terraform apply` job had just finished - with less than a second to spare until our `require.Eventually` completes - but there's still work to be done (ie, collecting the state files). So my suspicion is that terraform might, in some cases, exceed our 5s timeout.
Note that in the setup for this test - there is a similar project history wait that waits for 15s, so I borrowed that here.
In the future - we can look at potentially using a simple echo provider to exercise this in the unit test, in a way that is more reliable in terms of timing. I'll log an issue to track that.
Co-authored-by: Bryan <bryan@coder.com>
2022-02-03 20:34:50 +00:00
|
|
|
|
2022-02-01 18:15:54 +00:00
|
|
|
"cdr.dev/slog"
|
2023-08-18 18:55:43 +00:00
|
|
|
"github.com/coder/coder/v2/coderd/tracing"
|
2023-12-18 16:44:52 +00:00
|
|
|
"github.com/coder/coder/v2/codersdk"
|
2023-08-18 18:55:43 +00:00
|
|
|
"github.com/coder/coder/v2/provisionerd/proto"
|
|
|
|
"github.com/coder/coder/v2/provisionerd/runner"
|
|
|
|
sdkproto "github.com/coder/coder/v2/provisionersdk/proto"
|
2022-02-01 18:15:54 +00:00
|
|
|
"github.com/coder/retry"
|
|
|
|
)
|
|
|
|
|
|
|
|
// Dialer represents the function to create a daemon client connection.
|
|
|
|
type Dialer func(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error)
|
|
|
|
|
2023-09-08 09:53:48 +00:00
|
|
|
// ConnectResponse is the response returned asynchronously from Connector.Connect
|
|
|
|
// containing either the Provisioner Client or an Error. The Job is also returned
|
|
|
|
// unaltered to disambiguate responses if the respCh is shared among multiple jobs
|
|
|
|
type ConnectResponse struct {
|
|
|
|
Job *proto.AcquiredJob
|
|
|
|
Client sdkproto.DRPCProvisionerClient
|
|
|
|
Error error
|
|
|
|
}
|
|
|
|
|
|
|
|
// Connector allows the provisioner daemon to Connect to a provisioner
|
|
|
|
// for the given job.
|
|
|
|
type Connector interface {
|
|
|
|
// Connect to the correct provisioner for the given job. The response is
|
|
|
|
// delivered asynchronously over the respCh. If the provided context expires,
|
|
|
|
// the Connector may stop waiting for the provisioner and return an error
|
|
|
|
// response.
|
|
|
|
Connect(ctx context.Context, job *proto.AcquiredJob, respCh chan<- ConnectResponse)
|
|
|
|
}
|
2022-02-01 18:15:54 +00:00
|
|
|
|
|
|
|
// Options provides customizations to the behavior of a provisioner daemon.
|
|
|
|
type Options struct {
|
2022-11-05 00:03:01 +00:00
|
|
|
Logger slog.Logger
|
|
|
|
TracerProvider trace.TracerProvider
|
|
|
|
Metrics *Metrics
|
2022-02-01 18:15:54 +00:00
|
|
|
|
2022-03-22 19:17:50 +00:00
|
|
|
ForceCancelInterval time.Duration
|
|
|
|
UpdateInterval time.Duration
|
2022-11-07 05:59:01 +00:00
|
|
|
LogBufferInterval time.Duration
|
2023-09-08 09:53:48 +00:00
|
|
|
Connector Connector
|
2022-02-01 18:15:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// New creates and starts a provisioner daemon.
|
2022-02-28 18:40:49 +00:00
|
|
|
func New(clientDialer Dialer, opts *Options) *Server {
|
2022-11-16 22:34:06 +00:00
|
|
|
if opts == nil {
|
|
|
|
opts = &Options{}
|
|
|
|
}
|
feat: Add provisionerdaemon to coderd (#141)
* feat: Add history middleware parameters
These will be used for streaming logs, checking status,
and other operations related to workspace and project
history.
* refactor: Move all HTTP routes to top-level struct
Nesting all structs behind their respective structures
is leaky, and promotes naming conflicts between handlers.
Our HTTP routes cannot have conflicts, so neither should
function naming.
* Add provisioner daemon routes
* Add periodic updates
* Skip pubsub if short
* Return jobs with WorkspaceHistory
* Add endpoints for extracting singular history
* The full end-to-end operation works
* fix: Disable compression for websocket dRPC transport (#145)
There is a race condition in the interop between the websocket and `dRPC`: https://github.com/coder/coder/runs/5038545709?check_suite_focus=true#step:7:117 - it seems both the websocket and dRPC feel like they own the `byte[]` being sent between them. This can lead to data races, in which both `dRPC` and the websocket are writing.
This is just tracking some experimentation to fix that race condition
## Run results: ##
- Run 1: peer test failure
- Run 2: peer test failure
- Run 3: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040858460?check_suite_focus=true#step:8:45
```
status code 412: The provided project history is running. Wait for it to complete importing!`
```
- Run 4: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040957999?check_suite_focus=true#step:7:176
```
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
- Run 5: peer failure
- Run 6: Pass ✅
- Run 7: Peer failure
## Open Questions: ##
### Is `dRPC` or `websocket` at fault for the data race?
It looks like this condition is specifically happening when `dRPC` decides to [`SendError`]). This constructs a new byte payload from [`MarshalError`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/error.go#L15) - so `dRPC` has created this buffer and owns it.
From `dRPC`'s perspective, the callstack looks like this:
- [`sendPacket`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcstream/stream.go#L253)
- [`writeFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L65)
- [`AppendFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/packet.go#L128)
- with finally the data race happening here:
```go
// AppendFrame appends a marshaled form of the frame to the provided buffer.
func AppendFrame(buf []byte, fr Frame) []byte {
...
out := buf
out = append(out, control). // <---------
```
This should be fine, since `dPRC` create this buffer, and is taking the byte buffer constructed from `MarshalError` and tacking a bunch of headers on it to create a proper frame.
Once `dRPC` is done writing, it _hangs onto the buffer and resets it here__: https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L73
However... the websocket implementation, once it gets the buffer, it runs a `statelessDeflate` [here](https://github.com/nhooyr/websocket/blob/8dee580a7f74cf1713400307b4eee514b927870f/write.go#L180), which compresses the buffer on the fly. This functionality actually [mutates the buffer in place](https://github.com/klauspost/compress/blob/a1a9cfc821f00faf2f5231beaa96244344d50391/flate/stateless.go#L94), which is where get our race.
In the case where the `byte[]` aren't being manipulated anywhere else, this compress-in-place operation would be safe, and that's probably the case for most over-the-wire usages. In this case, though, where we're plumbing `dRPC` -> websocket, they both are manipulating it (`dRPC` is reusing the buffer for the next `write`, and `websocket` is compressing on the fly).
### Why does cloning on `Read` fail?
Get a bunch of errors like:
```
2022/02/02 19:26:10 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
```
# UPDATE:
We decided we could disable websocket compression, which would avoid the race because the in-place `deflate` operaton would no longer be run. Trying that out now:
- Run 1: ✅
- Run 2: https://github.com/coder/coder/runs/5042645522?check_suite_focus=true#step:8:338
- Run 3: ✅
- Run 4: https://github.com/coder/coder/runs/5042988758?check_suite_focus=true#step:7:168
- Run 5: ✅
* fix: Remove race condition with acquiredJobDone channel (#148)
Found another data race while running the tests: https://github.com/coder/coder/runs/5044320845?check_suite_focus=true#step:7:83
__Issue:__ There is a race in the p.acquiredJobDone chan - in particular, there can be a case where we're waiting on the channel to finish (in close) with <-p.acquiredJobDone, but in parallel, an acquireJob could've been started, which would create a new channel for p.acquiredJobDone. There is a similar race in `close(..)`ing the channel, which also came up in test runs.
__Fix:__ Instead of recreating the channel everytime, we can use `sync.WaitGroup` to accomplish the same functionality - a semaphore to make close wait for the current job to wrap up.
* fix: Bump up workspace history timeout (#149)
This is an attempted fix for failures like: https://github.com/coder/coder/runs/5043435263?check_suite_focus=true#step:7:32
Looking at the timing of the test:
```
t.go:56: 2022-02-02 21:33:21.964 [DEBUG] (terraform-provisioner) <provision.go:139> ran apply
t.go:56: 2022-02-02 21:33:21.991 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.050 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.090 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.140 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.195 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.240 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
It appears that the `terraform apply` job had just finished - with less than a second to spare until our `require.Eventually` completes - but there's still work to be done (ie, collecting the state files). So my suspicion is that terraform might, in some cases, exceed our 5s timeout.
Note that in the setup for this test - there is a similar project history wait that waits for 15s, so I borrowed that here.
In the future - we can look at potentially using a simple echo provider to exercise this in the unit test, in a way that is more reliable in terms of timing. I'll log an issue to track that.
Co-authored-by: Bryan <bryan@coder.com>
2022-02-03 20:34:50 +00:00
|
|
|
if opts.UpdateInterval == 0 {
|
|
|
|
opts.UpdateInterval = 5 * time.Second
|
|
|
|
}
|
2022-03-22 19:17:50 +00:00
|
|
|
if opts.ForceCancelInterval == 0 {
|
2022-11-08 13:19:40 +00:00
|
|
|
opts.ForceCancelInterval = 10 * time.Minute
|
2022-03-22 19:17:50 +00:00
|
|
|
}
|
2022-11-07 05:59:01 +00:00
|
|
|
if opts.LogBufferInterval == 0 {
|
2023-04-13 19:02:10 +00:00
|
|
|
opts.LogBufferInterval = 250 * time.Millisecond
|
2022-11-07 02:50:34 +00:00
|
|
|
}
|
2022-11-05 00:03:01 +00:00
|
|
|
if opts.TracerProvider == nil {
|
|
|
|
opts.TracerProvider = trace.NewNoopTracerProvider()
|
|
|
|
}
|
|
|
|
if opts.Metrics == nil {
|
|
|
|
reg := prometheus.NewRegistry()
|
|
|
|
mets := NewMetrics(reg)
|
|
|
|
opts.Metrics = &mets
|
2022-09-16 16:43:22 +00:00
|
|
|
}
|
|
|
|
|
2022-02-01 18:15:54 +00:00
|
|
|
ctx, ctxCancel := context.WithCancel(context.Background())
|
2022-02-28 18:40:49 +00:00
|
|
|
daemon := &Server{
|
2022-09-16 16:43:22 +00:00
|
|
|
opts: opts,
|
2022-11-05 00:03:01 +00:00
|
|
|
tracer: opts.TracerProvider.Tracer(tracing.TracerName),
|
2022-09-16 16:43:22 +00:00
|
|
|
|
2022-02-01 18:15:54 +00:00
|
|
|
clientDialer: clientDialer,
|
2023-09-19 06:25:57 +00:00
|
|
|
clientCh: make(chan proto.DRPCProvisionerDaemonClient),
|
2022-02-01 18:15:54 +00:00
|
|
|
|
2023-09-19 06:25:57 +00:00
|
|
|
closeContext: ctx,
|
|
|
|
closeCancel: ctxCancel,
|
|
|
|
closedCh: make(chan struct{}),
|
|
|
|
shuttingDownCh: make(chan struct{}),
|
|
|
|
acquireDoneCh: make(chan struct{}),
|
2022-02-01 18:15:54 +00:00
|
|
|
}
|
2022-07-01 16:55:46 +00:00
|
|
|
|
2023-09-19 06:25:57 +00:00
|
|
|
daemon.wg.Add(2)
|
|
|
|
go daemon.connect()
|
|
|
|
go daemon.acquireLoop()
|
2022-02-01 18:15:54 +00:00
|
|
|
return daemon
|
|
|
|
}
|
|
|
|
|
2022-02-28 18:40:49 +00:00
|
|
|
type Server struct {
|
2022-09-16 16:43:22 +00:00
|
|
|
opts *Options
|
|
|
|
tracer trace.Tracer
|
2022-02-01 18:15:54 +00:00
|
|
|
|
|
|
|
clientDialer Dialer
|
2023-09-19 06:25:57 +00:00
|
|
|
clientCh chan proto.DRPCProvisionerDaemonClient
|
|
|
|
|
|
|
|
wg sync.WaitGroup
|
2022-02-01 18:15:54 +00:00
|
|
|
|
2023-09-19 06:25:57 +00:00
|
|
|
// mutex protects all subsequent fields
|
|
|
|
mutex sync.Mutex
|
|
|
|
// closeContext is canceled when we start closing.
|
2022-04-25 01:33:19 +00:00
|
|
|
closeContext context.Context
|
|
|
|
closeCancel context.CancelFunc
|
2023-09-19 06:25:57 +00:00
|
|
|
// closeError stores the error when closing to return to subsequent callers
|
|
|
|
closeError error
|
|
|
|
// closingB is set to true when we start closing
|
|
|
|
closingB bool
|
|
|
|
// closedCh will receive when we complete closing
|
|
|
|
closedCh chan struct{}
|
|
|
|
// shuttingDownB is set to true when we start graceful shutdown
|
|
|
|
shuttingDownB bool
|
|
|
|
// shuttingDownCh will receive when we start graceful shutdown
|
|
|
|
shuttingDownCh chan struct{}
|
|
|
|
// acquireDoneCh will receive when the acquireLoop exits
|
|
|
|
acquireDoneCh chan struct{}
|
|
|
|
activeJob *runner.Runner
|
2022-02-01 18:15:54 +00:00
|
|
|
}
|
|
|
|
|
2022-11-05 00:03:01 +00:00
|
|
|
type Metrics struct {
|
|
|
|
Runner runner.Metrics
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewMetrics(reg prometheus.Registerer) Metrics {
|
|
|
|
auto := promauto.With(reg)
|
|
|
|
|
|
|
|
return Metrics{
|
|
|
|
Runner: runner.Metrics{
|
|
|
|
ConcurrentJobs: auto.NewGaugeVec(prometheus.GaugeOpts{
|
|
|
|
Namespace: "coderd",
|
|
|
|
Subsystem: "provisionerd",
|
|
|
|
Name: "jobs_current",
|
2022-12-01 11:50:57 +00:00
|
|
|
Help: "The number of currently running provisioner jobs.",
|
2022-11-05 00:03:01 +00:00
|
|
|
}, []string{"provisioner"}),
|
2023-06-06 21:50:11 +00:00
|
|
|
NumDaemons: auto.NewGauge(prometheus.GaugeOpts{
|
|
|
|
Namespace: "coderd",
|
|
|
|
Subsystem: "provisionerd",
|
|
|
|
Name: "num_daemons",
|
|
|
|
Help: "The number of provisioner daemons.",
|
|
|
|
}),
|
2022-11-05 00:03:01 +00:00
|
|
|
JobTimings: auto.NewHistogramVec(prometheus.HistogramOpts{
|
|
|
|
Namespace: "coderd",
|
|
|
|
Subsystem: "provisionerd",
|
2023-01-13 17:15:25 +00:00
|
|
|
Name: "job_timings_seconds",
|
|
|
|
Help: "The provisioner job time duration in seconds.",
|
2022-11-05 00:03:01 +00:00
|
|
|
Buckets: []float64{
|
2023-01-13 17:15:25 +00:00
|
|
|
1, // 1s
|
|
|
|
10,
|
|
|
|
30,
|
|
|
|
60, // 1min
|
|
|
|
60 * 5,
|
|
|
|
60 * 10,
|
|
|
|
60 * 30, // 30min
|
|
|
|
60 * 60, // 1hr
|
2022-11-05 00:03:01 +00:00
|
|
|
},
|
|
|
|
}, []string{"provisioner", "status"}),
|
2023-02-23 01:28:10 +00:00
|
|
|
WorkspaceBuilds: auto.NewCounterVec(prometheus.CounterOpts{
|
|
|
|
Namespace: "coderd",
|
|
|
|
Subsystem: "", // Explicitly empty to make this a top-level metric.
|
|
|
|
Name: "workspace_builds_total",
|
|
|
|
Help: "The number of workspaces started, updated, or deleted.",
|
2023-02-24 04:38:58 +00:00
|
|
|
}, []string{"workspace_owner", "workspace_name", "template_name", "template_version", "workspace_transition", "status"}),
|
2022-11-05 00:03:01 +00:00
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
feat: Add provisionerdaemon to coderd (#141)
* feat: Add history middleware parameters
These will be used for streaming logs, checking status,
and other operations related to workspace and project
history.
* refactor: Move all HTTP routes to top-level struct
Nesting all structs behind their respective structures
is leaky, and promotes naming conflicts between handlers.
Our HTTP routes cannot have conflicts, so neither should
function naming.
* Add provisioner daemon routes
* Add periodic updates
* Skip pubsub if short
* Return jobs with WorkspaceHistory
* Add endpoints for extracting singular history
* The full end-to-end operation works
* fix: Disable compression for websocket dRPC transport (#145)
There is a race condition in the interop between the websocket and `dRPC`: https://github.com/coder/coder/runs/5038545709?check_suite_focus=true#step:7:117 - it seems both the websocket and dRPC feel like they own the `byte[]` being sent between them. This can lead to data races, in which both `dRPC` and the websocket are writing.
This is just tracking some experimentation to fix that race condition
## Run results: ##
- Run 1: peer test failure
- Run 2: peer test failure
- Run 3: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040858460?check_suite_focus=true#step:8:45
```
status code 412: The provided project history is running. Wait for it to complete importing!`
```
- Run 4: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040957999?check_suite_focus=true#step:7:176
```
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
- Run 5: peer failure
- Run 6: Pass ✅
- Run 7: Peer failure
## Open Questions: ##
### Is `dRPC` or `websocket` at fault for the data race?
It looks like this condition is specifically happening when `dRPC` decides to [`SendError`]). This constructs a new byte payload from [`MarshalError`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/error.go#L15) - so `dRPC` has created this buffer and owns it.
From `dRPC`'s perspective, the callstack looks like this:
- [`sendPacket`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcstream/stream.go#L253)
- [`writeFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L65)
- [`AppendFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/packet.go#L128)
- with finally the data race happening here:
```go
// AppendFrame appends a marshaled form of the frame to the provided buffer.
func AppendFrame(buf []byte, fr Frame) []byte {
...
out := buf
out = append(out, control). // <---------
```
This should be fine, since `dPRC` create this buffer, and is taking the byte buffer constructed from `MarshalError` and tacking a bunch of headers on it to create a proper frame.
Once `dRPC` is done writing, it _hangs onto the buffer and resets it here__: https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L73
However... the websocket implementation, once it gets the buffer, it runs a `statelessDeflate` [here](https://github.com/nhooyr/websocket/blob/8dee580a7f74cf1713400307b4eee514b927870f/write.go#L180), which compresses the buffer on the fly. This functionality actually [mutates the buffer in place](https://github.com/klauspost/compress/blob/a1a9cfc821f00faf2f5231beaa96244344d50391/flate/stateless.go#L94), which is where get our race.
In the case where the `byte[]` aren't being manipulated anywhere else, this compress-in-place operation would be safe, and that's probably the case for most over-the-wire usages. In this case, though, where we're plumbing `dRPC` -> websocket, they both are manipulating it (`dRPC` is reusing the buffer for the next `write`, and `websocket` is compressing on the fly).
### Why does cloning on `Read` fail?
Get a bunch of errors like:
```
2022/02/02 19:26:10 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
```
# UPDATE:
We decided we could disable websocket compression, which would avoid the race because the in-place `deflate` operaton would no longer be run. Trying that out now:
- Run 1: ✅
- Run 2: https://github.com/coder/coder/runs/5042645522?check_suite_focus=true#step:8:338
- Run 3: ✅
- Run 4: https://github.com/coder/coder/runs/5042988758?check_suite_focus=true#step:7:168
- Run 5: ✅
* fix: Remove race condition with acquiredJobDone channel (#148)
Found another data race while running the tests: https://github.com/coder/coder/runs/5044320845?check_suite_focus=true#step:7:83
__Issue:__ There is a race in the p.acquiredJobDone chan - in particular, there can be a case where we're waiting on the channel to finish (in close) with <-p.acquiredJobDone, but in parallel, an acquireJob could've been started, which would create a new channel for p.acquiredJobDone. There is a similar race in `close(..)`ing the channel, which also came up in test runs.
__Fix:__ Instead of recreating the channel everytime, we can use `sync.WaitGroup` to accomplish the same functionality - a semaphore to make close wait for the current job to wrap up.
* fix: Bump up workspace history timeout (#149)
This is an attempted fix for failures like: https://github.com/coder/coder/runs/5043435263?check_suite_focus=true#step:7:32
Looking at the timing of the test:
```
t.go:56: 2022-02-02 21:33:21.964 [DEBUG] (terraform-provisioner) <provision.go:139> ran apply
t.go:56: 2022-02-02 21:33:21.991 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.050 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.090 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.140 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.195 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.240 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
It appears that the `terraform apply` job had just finished - with less than a second to spare until our `require.Eventually` completes - but there's still work to be done (ie, collecting the state files). So my suspicion is that terraform might, in some cases, exceed our 5s timeout.
Note that in the setup for this test - there is a similar project history wait that waits for 15s, so I borrowed that here.
In the future - we can look at potentially using a simple echo provider to exercise this in the unit test, in a way that is more reliable in terms of timing. I'll log an issue to track that.
Co-authored-by: Bryan <bryan@coder.com>
2022-02-03 20:34:50 +00:00
|
|
|
// Connect establishes a connection to coderd.
|
2023-09-19 06:25:57 +00:00
|
|
|
func (p *Server) connect() {
|
|
|
|
defer p.opts.Logger.Debug(p.closeContext, "connect loop exited")
|
|
|
|
defer p.wg.Done()
|
2022-02-01 18:15:54 +00:00
|
|
|
// An exponential back-off occurs when the connection is failing to dial.
|
|
|
|
// This is to prevent server spam in case of a coderd outage.
|
2023-09-19 06:25:57 +00:00
|
|
|
connectLoop:
|
|
|
|
for retrier := retry.New(50*time.Millisecond, 10*time.Second); retrier.Wait(p.closeContext); {
|
2023-02-27 16:18:19 +00:00
|
|
|
// It's possible for the provisioner daemon to be shut down
|
|
|
|
// before the wait is complete!
|
|
|
|
if p.isClosed() {
|
|
|
|
return
|
|
|
|
}
|
2023-09-19 06:25:57 +00:00
|
|
|
p.opts.Logger.Debug(p.closeContext, "dialing coderd")
|
|
|
|
client, err := p.clientDialer(p.closeContext)
|
2022-02-01 18:15:54 +00:00
|
|
|
if err != nil {
|
feat: Add provisionerdaemon to coderd (#141)
* feat: Add history middleware parameters
These will be used for streaming logs, checking status,
and other operations related to workspace and project
history.
* refactor: Move all HTTP routes to top-level struct
Nesting all structs behind their respective structures
is leaky, and promotes naming conflicts between handlers.
Our HTTP routes cannot have conflicts, so neither should
function naming.
* Add provisioner daemon routes
* Add periodic updates
* Skip pubsub if short
* Return jobs with WorkspaceHistory
* Add endpoints for extracting singular history
* The full end-to-end operation works
* fix: Disable compression for websocket dRPC transport (#145)
There is a race condition in the interop between the websocket and `dRPC`: https://github.com/coder/coder/runs/5038545709?check_suite_focus=true#step:7:117 - it seems both the websocket and dRPC feel like they own the `byte[]` being sent between them. This can lead to data races, in which both `dRPC` and the websocket are writing.
This is just tracking some experimentation to fix that race condition
## Run results: ##
- Run 1: peer test failure
- Run 2: peer test failure
- Run 3: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040858460?check_suite_focus=true#step:8:45
```
status code 412: The provided project history is running. Wait for it to complete importing!`
```
- Run 4: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040957999?check_suite_focus=true#step:7:176
```
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
- Run 5: peer failure
- Run 6: Pass ✅
- Run 7: Peer failure
## Open Questions: ##
### Is `dRPC` or `websocket` at fault for the data race?
It looks like this condition is specifically happening when `dRPC` decides to [`SendError`]). This constructs a new byte payload from [`MarshalError`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/error.go#L15) - so `dRPC` has created this buffer and owns it.
From `dRPC`'s perspective, the callstack looks like this:
- [`sendPacket`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcstream/stream.go#L253)
- [`writeFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L65)
- [`AppendFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/packet.go#L128)
- with finally the data race happening here:
```go
// AppendFrame appends a marshaled form of the frame to the provided buffer.
func AppendFrame(buf []byte, fr Frame) []byte {
...
out := buf
out = append(out, control). // <---------
```
This should be fine, since `dPRC` create this buffer, and is taking the byte buffer constructed from `MarshalError` and tacking a bunch of headers on it to create a proper frame.
Once `dRPC` is done writing, it _hangs onto the buffer and resets it here__: https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L73
However... the websocket implementation, once it gets the buffer, it runs a `statelessDeflate` [here](https://github.com/nhooyr/websocket/blob/8dee580a7f74cf1713400307b4eee514b927870f/write.go#L180), which compresses the buffer on the fly. This functionality actually [mutates the buffer in place](https://github.com/klauspost/compress/blob/a1a9cfc821f00faf2f5231beaa96244344d50391/flate/stateless.go#L94), which is where get our race.
In the case where the `byte[]` aren't being manipulated anywhere else, this compress-in-place operation would be safe, and that's probably the case for most over-the-wire usages. In this case, though, where we're plumbing `dRPC` -> websocket, they both are manipulating it (`dRPC` is reusing the buffer for the next `write`, and `websocket` is compressing on the fly).
### Why does cloning on `Read` fail?
Get a bunch of errors like:
```
2022/02/02 19:26:10 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
```
# UPDATE:
We decided we could disable websocket compression, which would avoid the race because the in-place `deflate` operaton would no longer be run. Trying that out now:
- Run 1: ✅
- Run 2: https://github.com/coder/coder/runs/5042645522?check_suite_focus=true#step:8:338
- Run 3: ✅
- Run 4: https://github.com/coder/coder/runs/5042988758?check_suite_focus=true#step:7:168
- Run 5: ✅
* fix: Remove race condition with acquiredJobDone channel (#148)
Found another data race while running the tests: https://github.com/coder/coder/runs/5044320845?check_suite_focus=true#step:7:83
__Issue:__ There is a race in the p.acquiredJobDone chan - in particular, there can be a case where we're waiting on the channel to finish (in close) with <-p.acquiredJobDone, but in parallel, an acquireJob could've been started, which would create a new channel for p.acquiredJobDone. There is a similar race in `close(..)`ing the channel, which also came up in test runs.
__Fix:__ Instead of recreating the channel everytime, we can use `sync.WaitGroup` to accomplish the same functionality - a semaphore to make close wait for the current job to wrap up.
* fix: Bump up workspace history timeout (#149)
This is an attempted fix for failures like: https://github.com/coder/coder/runs/5043435263?check_suite_focus=true#step:7:32
Looking at the timing of the test:
```
t.go:56: 2022-02-02 21:33:21.964 [DEBUG] (terraform-provisioner) <provision.go:139> ran apply
t.go:56: 2022-02-02 21:33:21.991 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.050 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.090 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.140 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.195 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.240 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
It appears that the `terraform apply` job had just finished - with less than a second to spare until our `require.Eventually` completes - but there's still work to be done (ie, collecting the state files). So my suspicion is that terraform might, in some cases, exceed our 5s timeout.
Note that in the setup for this test - there is a similar project history wait that waits for 15s, so I borrowed that here.
In the future - we can look at potentially using a simple echo provider to exercise this in the unit test, in a way that is more reliable in terms of timing. I'll log an issue to track that.
Co-authored-by: Bryan <bryan@coder.com>
2022-02-03 20:34:50 +00:00
|
|
|
if errors.Is(err, context.Canceled) {
|
|
|
|
return
|
|
|
|
}
|
2023-12-18 16:44:52 +00:00
|
|
|
var sdkErr *codersdk.Error
|
|
|
|
// If something is wrong with our auth, stop trying to connect.
|
|
|
|
if errors.As(err, &sdkErr) && sdkErr.StatusCode() == http.StatusForbidden {
|
|
|
|
p.opts.Logger.Error(p.closeContext, "not authorized to dial coderd", slog.Error(err))
|
|
|
|
return
|
|
|
|
}
|
2022-02-04 19:36:43 +00:00
|
|
|
if p.isClosed() {
|
|
|
|
return
|
|
|
|
}
|
2023-09-19 06:25:57 +00:00
|
|
|
p.opts.Logger.Warn(p.closeContext, "coderd client failed to dial", slog.Error(err))
|
2022-02-01 18:15:54 +00:00
|
|
|
continue
|
|
|
|
}
|
2023-09-19 06:25:57 +00:00
|
|
|
p.opts.Logger.Info(p.closeContext, "successfully connected to coderd")
|
|
|
|
retrier.Reset()
|
2022-02-01 18:15:54 +00:00
|
|
|
|
2023-09-19 06:25:57 +00:00
|
|
|
// serve the client until we are closed or it disconnects
|
2022-02-01 18:15:54 +00:00
|
|
|
for {
|
|
|
|
select {
|
2022-04-25 01:33:19 +00:00
|
|
|
case <-p.closeContext.Done():
|
2023-09-19 06:25:57 +00:00
|
|
|
client.DRPCConn().Close()
|
2022-02-01 18:15:54 +00:00
|
|
|
return
|
2022-04-25 01:33:19 +00:00
|
|
|
case <-client.DRPCConn().Closed():
|
2023-09-19 06:25:57 +00:00
|
|
|
p.opts.Logger.Info(p.closeContext, "connection to coderd closed")
|
|
|
|
continue connectLoop
|
|
|
|
case p.clientCh <- client:
|
|
|
|
continue
|
2022-02-01 18:15:54 +00:00
|
|
|
}
|
|
|
|
}
|
2022-12-01 22:54:53 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-25 01:33:19 +00:00
|
|
|
func (p *Server) client() (proto.DRPCProvisionerDaemonClient, bool) {
|
2023-09-19 06:25:57 +00:00
|
|
|
select {
|
|
|
|
case <-p.closeContext.Done():
|
2022-04-25 01:33:19 +00:00
|
|
|
return nil, false
|
2024-05-03 15:15:17 +00:00
|
|
|
case <-p.shuttingDownCh:
|
|
|
|
// Shutting down should return a nil client and unblock
|
|
|
|
return nil, false
|
2023-09-19 06:25:57 +00:00
|
|
|
case client := <-p.clientCh:
|
|
|
|
return client, true
|
2022-04-25 01:33:19 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-09-19 06:25:57 +00:00
|
|
|
func (p *Server) acquireLoop() {
|
|
|
|
defer p.opts.Logger.Debug(p.closeContext, "acquire loop exited")
|
|
|
|
defer p.wg.Done()
|
|
|
|
defer func() { close(p.acquireDoneCh) }()
|
|
|
|
ctx := p.closeContext
|
|
|
|
for {
|
|
|
|
if p.acquireExit() {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
client, ok := p.client()
|
|
|
|
if !ok {
|
|
|
|
p.opts.Logger.Debug(ctx, "shut down before client (re) connected")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
p.acquireAndRunOne(client)
|
2022-02-04 01:13:22 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-09-19 06:25:57 +00:00
|
|
|
// acquireExit returns true if the acquire loop should exit
|
|
|
|
func (p *Server) acquireExit() bool {
|
2022-07-01 16:55:46 +00:00
|
|
|
p.mutex.Lock()
|
|
|
|
defer p.mutex.Unlock()
|
2023-09-19 06:25:57 +00:00
|
|
|
if p.closingB {
|
|
|
|
p.opts.Logger.Debug(p.closeContext, "exiting acquire; provisionerd is closing")
|
|
|
|
return true
|
2022-12-01 22:54:53 +00:00
|
|
|
}
|
2023-09-19 06:25:57 +00:00
|
|
|
if p.shuttingDownB {
|
|
|
|
p.opts.Logger.Debug(p.closeContext, "exiting acquire; provisionerd is shutting down")
|
|
|
|
return true
|
2022-04-25 01:33:19 +00:00
|
|
|
}
|
2023-09-19 06:25:57 +00:00
|
|
|
return false
|
|
|
|
}
|
2022-09-16 16:43:22 +00:00
|
|
|
|
2023-09-19 06:25:57 +00:00
|
|
|
func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) {
|
|
|
|
ctx := p.closeContext
|
|
|
|
p.opts.Logger.Debug(ctx, "start of acquireAndRunOne")
|
|
|
|
job, err := p.acquireGraceful(client)
|
|
|
|
p.opts.Logger.Debug(ctx, "graceful acquire done", slog.F("job_id", job.GetJobId()), slog.Error(err))
|
2022-02-01 18:15:54 +00:00
|
|
|
if err != nil {
|
2022-12-01 22:54:53 +00:00
|
|
|
if errors.Is(err, context.Canceled) ||
|
|
|
|
errors.Is(err, yamux.ErrSessionShutdown) ||
|
|
|
|
errors.Is(err, fasthttputil.ErrInmemoryListenerClosed) {
|
feat: Add provisionerdaemon to coderd (#141)
* feat: Add history middleware parameters
These will be used for streaming logs, checking status,
and other operations related to workspace and project
history.
* refactor: Move all HTTP routes to top-level struct
Nesting all structs behind their respective structures
is leaky, and promotes naming conflicts between handlers.
Our HTTP routes cannot have conflicts, so neither should
function naming.
* Add provisioner daemon routes
* Add periodic updates
* Skip pubsub if short
* Return jobs with WorkspaceHistory
* Add endpoints for extracting singular history
* The full end-to-end operation works
* fix: Disable compression for websocket dRPC transport (#145)
There is a race condition in the interop between the websocket and `dRPC`: https://github.com/coder/coder/runs/5038545709?check_suite_focus=true#step:7:117 - it seems both the websocket and dRPC feel like they own the `byte[]` being sent between them. This can lead to data races, in which both `dRPC` and the websocket are writing.
This is just tracking some experimentation to fix that race condition
## Run results: ##
- Run 1: peer test failure
- Run 2: peer test failure
- Run 3: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040858460?check_suite_focus=true#step:8:45
```
status code 412: The provided project history is running. Wait for it to complete importing!`
```
- Run 4: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040957999?check_suite_focus=true#step:7:176
```
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
- Run 5: peer failure
- Run 6: Pass ✅
- Run 7: Peer failure
## Open Questions: ##
### Is `dRPC` or `websocket` at fault for the data race?
It looks like this condition is specifically happening when `dRPC` decides to [`SendError`]). This constructs a new byte payload from [`MarshalError`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/error.go#L15) - so `dRPC` has created this buffer and owns it.
From `dRPC`'s perspective, the callstack looks like this:
- [`sendPacket`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcstream/stream.go#L253)
- [`writeFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L65)
- [`AppendFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/packet.go#L128)
- with finally the data race happening here:
```go
// AppendFrame appends a marshaled form of the frame to the provided buffer.
func AppendFrame(buf []byte, fr Frame) []byte {
...
out := buf
out = append(out, control). // <---------
```
This should be fine, since `dPRC` create this buffer, and is taking the byte buffer constructed from `MarshalError` and tacking a bunch of headers on it to create a proper frame.
Once `dRPC` is done writing, it _hangs onto the buffer and resets it here__: https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L73
However... the websocket implementation, once it gets the buffer, it runs a `statelessDeflate` [here](https://github.com/nhooyr/websocket/blob/8dee580a7f74cf1713400307b4eee514b927870f/write.go#L180), which compresses the buffer on the fly. This functionality actually [mutates the buffer in place](https://github.com/klauspost/compress/blob/a1a9cfc821f00faf2f5231beaa96244344d50391/flate/stateless.go#L94), which is where get our race.
In the case where the `byte[]` aren't being manipulated anywhere else, this compress-in-place operation would be safe, and that's probably the case for most over-the-wire usages. In this case, though, where we're plumbing `dRPC` -> websocket, they both are manipulating it (`dRPC` is reusing the buffer for the next `write`, and `websocket` is compressing on the fly).
### Why does cloning on `Read` fail?
Get a bunch of errors like:
```
2022/02/02 19:26:10 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
```
# UPDATE:
We decided we could disable websocket compression, which would avoid the race because the in-place `deflate` operaton would no longer be run. Trying that out now:
- Run 1: ✅
- Run 2: https://github.com/coder/coder/runs/5042645522?check_suite_focus=true#step:8:338
- Run 3: ✅
- Run 4: https://github.com/coder/coder/runs/5042988758?check_suite_focus=true#step:7:168
- Run 5: ✅
* fix: Remove race condition with acquiredJobDone channel (#148)
Found another data race while running the tests: https://github.com/coder/coder/runs/5044320845?check_suite_focus=true#step:7:83
__Issue:__ There is a race in the p.acquiredJobDone chan - in particular, there can be a case where we're waiting on the channel to finish (in close) with <-p.acquiredJobDone, but in parallel, an acquireJob could've been started, which would create a new channel for p.acquiredJobDone. There is a similar race in `close(..)`ing the channel, which also came up in test runs.
__Fix:__ Instead of recreating the channel everytime, we can use `sync.WaitGroup` to accomplish the same functionality - a semaphore to make close wait for the current job to wrap up.
* fix: Bump up workspace history timeout (#149)
This is an attempted fix for failures like: https://github.com/coder/coder/runs/5043435263?check_suite_focus=true#step:7:32
Looking at the timing of the test:
```
t.go:56: 2022-02-02 21:33:21.964 [DEBUG] (terraform-provisioner) <provision.go:139> ran apply
t.go:56: 2022-02-02 21:33:21.991 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.050 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.090 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.140 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.195 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
t.go:56: 2022-02-02 21:33:22.240 [DEBUG] (provisionerd) <provisionerd.go:162> skipping acquire; job is already running
workspacehistory_test.go:122:
Error Trace: workspacehistory_test.go:122
Error: Condition never satisfied
Test: TestWorkspaceHistory/CreateHistory
```
It appears that the `terraform apply` job had just finished - with less than a second to spare until our `require.Eventually` completes - but there's still work to be done (ie, collecting the state files). So my suspicion is that terraform might, in some cases, exceed our 5s timeout.
Note that in the setup for this test - there is a similar project history wait that waits for 15s, so I borrowed that here.
In the future - we can look at potentially using a simple echo provider to exercise this in the unit test, in a way that is more reliable in terms of timing. I'll log an issue to track that.
Co-authored-by: Bryan <bryan@coder.com>
2022-02-03 20:34:50 +00:00
|
|
|
return
|
|
|
|
}
|
2022-09-16 16:43:22 +00:00
|
|
|
|
2023-06-21 10:00:38 +00:00
|
|
|
p.opts.Logger.Warn(ctx, "provisionerd was unable to acquire job", slog.Error(err))
|
2022-02-01 18:15:54 +00:00
|
|
|
return
|
|
|
|
}
|
2022-02-04 01:13:22 +00:00
|
|
|
if job.JobId == "" {
|
2023-09-19 06:25:57 +00:00
|
|
|
p.opts.Logger.Debug(ctx, "acquire job successfully canceled")
|
2022-02-01 18:15:54 +00:00
|
|
|
return
|
|
|
|
}
|
2022-09-16 16:43:22 +00:00
|
|
|
|
2023-05-03 23:02:35 +00:00
|
|
|
if len(job.TraceMetadata) > 0 {
|
|
|
|
ctx = tracing.MetadataToContext(ctx, job.TraceMetadata)
|
|
|
|
}
|
2022-09-16 16:43:22 +00:00
|
|
|
ctx, span := p.tracer.Start(ctx, tracing.FuncName(), trace.WithAttributes(
|
|
|
|
semconv.ServiceNameKey.String("coderd.provisionerd"),
|
|
|
|
attribute.String("job_id", job.JobId),
|
|
|
|
attribute.String("job_type", reflect.TypeOf(job.GetType()).Elem().Name()),
|
|
|
|
attribute.Int64("job_created_at", job.CreatedAt),
|
|
|
|
attribute.String("initiator_username", job.UserName),
|
|
|
|
attribute.String("provisioner", job.Provisioner),
|
|
|
|
attribute.Int("template_size_bytes", len(job.TemplateSourceArchive)),
|
|
|
|
))
|
|
|
|
defer span.End()
|
|
|
|
|
2023-06-13 18:17:04 +00:00
|
|
|
fields := []any{
|
2023-02-24 00:03:12 +00:00
|
|
|
slog.F("initiator_username", job.UserName),
|
|
|
|
slog.F("provisioner", job.Provisioner),
|
|
|
|
slog.F("job_id", job.JobId),
|
|
|
|
}
|
|
|
|
|
2022-09-16 16:43:22 +00:00
|
|
|
if build := job.GetWorkspaceBuild(); build != nil {
|
2023-02-24 00:03:12 +00:00
|
|
|
fields = append(fields,
|
2023-02-24 04:38:58 +00:00
|
|
|
slog.F("workspace_transition", build.Metadata.WorkspaceTransition.String()),
|
|
|
|
slog.F("workspace_owner", build.Metadata.WorkspaceOwner),
|
2023-02-24 00:03:12 +00:00
|
|
|
slog.F("template_name", build.Metadata.TemplateName),
|
|
|
|
slog.F("template_version", build.Metadata.TemplateVersion),
|
|
|
|
slog.F("workspace_build_id", build.WorkspaceBuildId),
|
|
|
|
slog.F("workspace_id", build.Metadata.WorkspaceId),
|
|
|
|
slog.F("workspace_name", build.WorkspaceName),
|
|
|
|
)
|
|
|
|
|
2022-09-16 16:43:22 +00:00
|
|
|
span.SetAttributes(
|
|
|
|
attribute.String("workspace_build_id", build.WorkspaceBuildId),
|
|
|
|
attribute.String("workspace_id", build.Metadata.WorkspaceId),
|
|
|
|
attribute.String("workspace_name", build.WorkspaceName),
|
|
|
|
attribute.String("workspace_owner_id", build.Metadata.WorkspaceOwnerId),
|
|
|
|
attribute.String("workspace_owner", build.Metadata.WorkspaceOwner),
|
|
|
|
attribute.String("workspace_transition", build.Metadata.WorkspaceTransition.String()),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
2023-06-21 10:00:38 +00:00
|
|
|
p.opts.Logger.Debug(ctx, "acquired job", fields...)
|
2022-02-01 18:15:54 +00:00
|
|
|
|
2023-09-08 09:53:48 +00:00
|
|
|
respCh := make(chan ConnectResponse)
|
|
|
|
p.opts.Connector.Connect(ctx, job, respCh)
|
|
|
|
resp := <-respCh
|
|
|
|
if resp.Error != nil {
|
2022-07-01 16:55:46 +00:00
|
|
|
err := p.FailJob(ctx, &proto.FailedJob{
|
|
|
|
JobId: job.JobId,
|
2023-09-08 09:53:48 +00:00
|
|
|
Error: fmt.Sprintf("failed to connect to provisioner: %s", resp.Error),
|
2022-05-17 20:00:48 +00:00
|
|
|
})
|
|
|
|
if err != nil {
|
2023-06-21 10:00:38 +00:00
|
|
|
p.opts.Logger.Error(ctx, "provisioner job failed", slog.F("job_id", job.JobId), slog.Error(err))
|
2022-05-17 20:00:48 +00:00
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
2022-09-16 16:43:22 +00:00
|
|
|
|
2023-09-19 06:25:57 +00:00
|
|
|
p.mutex.Lock()
|
2022-11-07 02:50:34 +00:00
|
|
|
p.activeJob = runner.New(
|
2022-09-16 16:43:22 +00:00
|
|
|
ctx,
|
|
|
|
job,
|
2022-11-11 22:45:58 +00:00
|
|
|
runner.Options{
|
|
|
|
Updater: p,
|
2022-11-14 17:57:33 +00:00
|
|
|
QuotaCommitter: p,
|
2023-08-19 17:56:08 +00:00
|
|
|
Logger: p.opts.Logger.Named("runner"),
|
2023-09-08 09:53:48 +00:00
|
|
|
Provisioner: resp.Client,
|
2022-11-11 22:45:58 +00:00
|
|
|
UpdateInterval: p.opts.UpdateInterval,
|
|
|
|
ForceCancelInterval: p.opts.ForceCancelInterval,
|
|
|
|
LogDebounceInterval: p.opts.LogBufferInterval,
|
|
|
|
Tracer: p.tracer,
|
|
|
|
Metrics: p.opts.Metrics.Runner,
|
|
|
|
},
|
2022-09-16 16:43:22 +00:00
|
|
|
)
|
2023-09-19 06:25:57 +00:00
|
|
|
p.mutex.Unlock()
|
|
|
|
p.activeJob.Run()
|
|
|
|
p.mutex.Lock()
|
|
|
|
p.activeJob = nil
|
|
|
|
p.mutex.Unlock()
|
|
|
|
}
|
2022-09-16 16:43:22 +00:00
|
|
|
|
2023-09-19 06:25:57 +00:00
|
|
|
// acquireGraceful attempts to acquire a job from the server, handling canceling the acquisition if we gracefully shut
|
|
|
|
// down.
|
|
|
|
func (p *Server) acquireGraceful(client proto.DRPCProvisionerDaemonClient) (*proto.AcquiredJob, error) {
|
|
|
|
stream, err := client.AcquireJobWithCancel(p.closeContext)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
acquireDone := make(chan struct{})
|
|
|
|
go func() {
|
|
|
|
select {
|
|
|
|
case <-p.closeContext.Done():
|
|
|
|
return
|
|
|
|
case <-p.shuttingDownCh:
|
|
|
|
p.opts.Logger.Debug(p.closeContext, "sending acquire job cancel")
|
|
|
|
err := stream.Send(&proto.CancelAcquire{})
|
|
|
|
if err != nil {
|
|
|
|
p.opts.Logger.Warn(p.closeContext, "failed to gracefully cancel acquire job")
|
|
|
|
}
|
|
|
|
return
|
|
|
|
case <-acquireDone:
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
job, err := stream.Recv()
|
|
|
|
close(acquireDone)
|
|
|
|
return job, err
|
2022-05-17 20:00:48 +00:00
|
|
|
}
|
|
|
|
|
2022-07-01 16:55:46 +00:00
|
|
|
func retryable(err error) bool {
|
2022-11-22 18:19:32 +00:00
|
|
|
return xerrors.Is(err, yamux.ErrSessionShutdown) || xerrors.Is(err, io.EOF) || xerrors.Is(err, fasthttputil.ErrInmemoryListenerClosed) ||
|
2022-07-01 16:55:46 +00:00
|
|
|
// annoyingly, dRPC sometimes returns context.Canceled if the transport was closed, even if the context for
|
|
|
|
// the RPC *is not canceled*. Retrying is fine if the RPC context is not canceled.
|
|
|
|
xerrors.Is(err, context.Canceled)
|
2022-02-08 01:35:18 +00:00
|
|
|
}
|
|
|
|
|
2023-04-13 19:02:10 +00:00
|
|
|
// clientDoWithRetries runs the function f with a client, and retries with
|
|
|
|
// backoff until either the error returned is not retryable() or the context
|
|
|
|
// expires.
|
|
|
|
func clientDoWithRetries[T any](ctx context.Context,
|
|
|
|
getClient func() (proto.DRPCProvisionerDaemonClient, bool),
|
|
|
|
f func(context.Context, proto.DRPCProvisionerDaemonClient) (T, error),
|
|
|
|
) (ret T, _ error) {
|
2022-07-01 16:55:46 +00:00
|
|
|
for retrier := retry.New(25*time.Millisecond, 5*time.Second); retrier.Wait(ctx); {
|
2023-04-13 19:02:10 +00:00
|
|
|
client, ok := getClient()
|
2022-07-01 16:55:46 +00:00
|
|
|
if !ok {
|
|
|
|
continue
|
2022-02-01 18:15:54 +00:00
|
|
|
}
|
2022-07-01 16:55:46 +00:00
|
|
|
resp, err := f(ctx, client)
|
|
|
|
if retryable(err) {
|
|
|
|
continue
|
2022-02-08 01:35:18 +00:00
|
|
|
}
|
2022-07-01 16:55:46 +00:00
|
|
|
return resp, err
|
2022-02-08 01:35:18 +00:00
|
|
|
}
|
2023-04-13 19:02:10 +00:00
|
|
|
return ret, ctx.Err()
|
2022-02-08 01:35:18 +00:00
|
|
|
}
|
|
|
|
|
2022-11-14 17:57:33 +00:00
|
|
|
func (p *Server) CommitQuota(ctx context.Context, in *proto.CommitQuotaRequest) (*proto.CommitQuotaResponse, error) {
|
2023-04-13 19:02:10 +00:00
|
|
|
out, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.CommitQuotaResponse, error) {
|
2022-11-14 17:57:33 +00:00
|
|
|
return client.CommitQuota(ctx, in)
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2023-04-13 19:02:10 +00:00
|
|
|
return out, nil
|
2022-11-14 17:57:33 +00:00
|
|
|
}
|
|
|
|
|
2022-07-01 16:55:46 +00:00
|
|
|
func (p *Server) UpdateJob(ctx context.Context, in *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) {
|
2023-04-13 19:02:10 +00:00
|
|
|
out, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.UpdateJobResponse, error) {
|
2022-07-01 16:55:46 +00:00
|
|
|
return client.UpdateJob(ctx, in)
|
2022-03-22 19:17:50 +00:00
|
|
|
})
|
|
|
|
if err != nil {
|
2022-07-01 16:55:46 +00:00
|
|
|
return nil, err
|
2022-02-01 18:15:54 +00:00
|
|
|
}
|
2023-04-13 19:02:10 +00:00
|
|
|
return out, nil
|
2022-02-01 18:15:54 +00:00
|
|
|
}
|
|
|
|
|
2022-07-01 16:55:46 +00:00
|
|
|
func (p *Server) FailJob(ctx context.Context, in *proto.FailedJob) error {
|
2023-04-13 19:02:10 +00:00
|
|
|
_, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.Empty, error) {
|
2022-07-01 16:55:46 +00:00
|
|
|
return client.FailJob(ctx, in)
|
2022-06-01 14:44:53 +00:00
|
|
|
})
|
2022-07-01 16:55:46 +00:00
|
|
|
return err
|
2022-06-01 14:44:53 +00:00
|
|
|
}
|
|
|
|
|
2022-07-01 16:55:46 +00:00
|
|
|
func (p *Server) CompleteJob(ctx context.Context, in *proto.CompletedJob) error {
|
2023-04-13 19:02:10 +00:00
|
|
|
_, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.Empty, error) {
|
2022-07-01 16:55:46 +00:00
|
|
|
return client.CompleteJob(ctx, in)
|
2022-02-28 18:40:49 +00:00
|
|
|
})
|
2022-07-01 16:55:46 +00:00
|
|
|
return err
|
2022-02-01 18:15:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// isClosed returns whether the API is closed or not.
|
2022-02-28 18:40:49 +00:00
|
|
|
func (p *Server) isClosed() bool {
|
2022-02-01 18:15:54 +00:00
|
|
|
select {
|
2022-04-25 01:33:19 +00:00
|
|
|
case <-p.closeContext.Done():
|
2022-02-01 18:15:54 +00:00
|
|
|
return true
|
|
|
|
default:
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-03-15 13:16:36 +00:00
|
|
|
// Shutdown gracefully exists with the option to cancel the active job.
|
|
|
|
// If false, it will wait for the job to complete.
|
|
|
|
//
|
|
|
|
//nolint:revive
|
|
|
|
func (p *Server) Shutdown(ctx context.Context, cancelActiveJob bool) error {
|
2022-07-01 16:55:46 +00:00
|
|
|
p.mutex.Lock()
|
2022-02-28 18:40:49 +00:00
|
|
|
p.opts.Logger.Info(ctx, "attempting graceful shutdown")
|
2023-09-19 06:25:57 +00:00
|
|
|
if !p.shuttingDownB {
|
|
|
|
close(p.shuttingDownCh)
|
|
|
|
p.shuttingDownB = true
|
2022-07-01 16:55:46 +00:00
|
|
|
}
|
2024-03-15 13:16:36 +00:00
|
|
|
if cancelActiveJob && p.activeJob != nil {
|
2023-09-19 06:25:57 +00:00
|
|
|
p.activeJob.Cancel()
|
|
|
|
}
|
|
|
|
p.mutex.Unlock()
|
2022-02-28 18:40:49 +00:00
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
p.opts.Logger.Warn(ctx, "graceful shutdown failed", slog.Error(ctx.Err()))
|
|
|
|
return ctx.Err()
|
2023-09-19 06:25:57 +00:00
|
|
|
case <-p.acquireDoneCh:
|
2022-02-28 18:40:49 +00:00
|
|
|
p.opts.Logger.Info(ctx, "gracefully shutdown")
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close ends the provisioner. It will mark any running jobs as failed.
|
|
|
|
func (p *Server) Close() error {
|
2023-09-19 06:25:57 +00:00
|
|
|
p.opts.Logger.Info(p.closeContext, "closing provisionerd")
|
2022-02-01 18:15:54 +00:00
|
|
|
return p.closeWithError(nil)
|
|
|
|
}
|
|
|
|
|
|
|
|
// closeWithError closes the provisioner; subsequent reads/writes will return the error err.
|
2022-02-28 18:40:49 +00:00
|
|
|
func (p *Server) closeWithError(err error) error {
|
2022-07-01 16:55:46 +00:00
|
|
|
p.mutex.Lock()
|
2023-09-19 06:25:57 +00:00
|
|
|
var activeJob *runner.Runner
|
|
|
|
first := false
|
|
|
|
if !p.closingB {
|
|
|
|
first = true
|
|
|
|
p.closingB = true
|
|
|
|
// only the first caller to close should attempt to fail the active job
|
|
|
|
activeJob = p.activeJob
|
|
|
|
}
|
|
|
|
// don't hold the mutex while doing I/O.
|
|
|
|
p.mutex.Unlock()
|
|
|
|
if activeJob != nil {
|
|
|
|
errMsg := "provisioner daemon was shutdown gracefully"
|
|
|
|
if err != nil {
|
|
|
|
errMsg = err.Error()
|
|
|
|
}
|
|
|
|
p.opts.Logger.Debug(p.closeContext, "failing active job because of close")
|
2022-07-01 16:55:46 +00:00
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
|
|
|
defer cancel()
|
2023-09-19 06:25:57 +00:00
|
|
|
failErr := activeJob.Fail(ctx, &proto.FailedJob{Error: errMsg})
|
2022-07-01 16:55:46 +00:00
|
|
|
if failErr != nil {
|
2023-09-19 06:25:57 +00:00
|
|
|
activeJob.ForceStop()
|
2022-07-01 16:55:46 +00:00
|
|
|
}
|
|
|
|
if err == nil {
|
|
|
|
err = failErr
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-09-19 06:25:57 +00:00
|
|
|
if first {
|
|
|
|
p.closeCancel()
|
|
|
|
p.opts.Logger.Debug(context.Background(), "waiting for goroutines to exit")
|
|
|
|
p.wg.Wait()
|
|
|
|
p.opts.Logger.Debug(context.Background(), "closing server with error", slog.Error(err))
|
|
|
|
p.closeError = err
|
|
|
|
close(p.closedCh)
|
|
|
|
return err
|
2023-02-14 14:57:48 +00:00
|
|
|
}
|
2023-09-19 06:25:57 +00:00
|
|
|
p.opts.Logger.Debug(p.closeContext, "waiting for first closer to complete")
|
|
|
|
<-p.closedCh
|
|
|
|
p.opts.Logger.Debug(p.closeContext, "first closer completed")
|
|
|
|
return p.closeError
|
2022-02-01 18:15:54 +00:00
|
|
|
}
|