mirror of https://github.com/coder/coder.git
fix: use background context for inmem provisionerd (#11545)
This test case fails with an error log, showing "context canceled" when trying to send an acquired job to an in-mem provisionerd. https://github.com/coder/coder/runs/20331469006 In this case, we don't want to supress this error, since it could mean that we acquired a job, locked it in the database, then failed to send it to a provisioner. (We also don't want to mark the job as failed because we don't know whether the job made it to the provisionerd or not --- in the failed test you can see that the job is actually processed just fine). The reason we got context canceled is because the API was shutting down --- we don't want provisionerdserver to abruptly stop processing job stuff as the API shuts down as this will leave jobs in a bad state. This PR fixes up the use of contexts with provisionerdserver and the associated drpc service calls.
This commit is contained in:
parent
c125206b24
commit
dfe8efc186
|
@ -1371,10 +1371,10 @@ func newProvisionerDaemon(
|
||||||
connector[string(database.ProvisionerTypeTerraform)] = sdkproto.NewDRPCProvisionerClient(terraformClient)
|
connector[string(database.ProvisionerTypeTerraform)] = sdkproto.NewDRPCProvisionerClient(terraformClient)
|
||||||
}
|
}
|
||||||
|
|
||||||
return provisionerd.New(func(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error) {
|
return provisionerd.New(func(dialCtx context.Context) (proto.DRPCProvisionerDaemonClient, error) {
|
||||||
// This debounces calls to listen every second. Read the comment
|
// This debounces calls to listen every second. Read the comment
|
||||||
// in provisionerdserver.go to learn more!
|
// in provisionerdserver.go to learn more!
|
||||||
return coderAPI.CreateInMemoryProvisionerDaemon(ctx, name)
|
return coderAPI.CreateInMemoryProvisionerDaemon(dialCtx, name)
|
||||||
}, &provisionerd.Options{
|
}, &provisionerd.Options{
|
||||||
Logger: logger.Named(fmt.Sprintf("provisionerd-%s", name)),
|
Logger: logger.Named(fmt.Sprintf("provisionerd-%s", name)),
|
||||||
UpdateInterval: time.Second,
|
UpdateInterval: time.Second,
|
||||||
|
|
|
@ -1174,7 +1174,7 @@ func compressHandler(h http.Handler) http.Handler {
|
||||||
|
|
||||||
// CreateInMemoryProvisionerDaemon is an in-memory connection to a provisionerd.
|
// CreateInMemoryProvisionerDaemon is an in-memory connection to a provisionerd.
|
||||||
// Useful when starting coderd and provisionerd in the same process.
|
// Useful when starting coderd and provisionerd in the same process.
|
||||||
func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, name string) (client proto.DRPCProvisionerDaemonClient, err error) {
|
func (api *API) CreateInMemoryProvisionerDaemon(dialCtx context.Context, name string) (client proto.DRPCProvisionerDaemonClient, err error) {
|
||||||
tracer := api.TracerProvider.Tracer(tracing.TracerName)
|
tracer := api.TracerProvider.Tracer(tracing.TracerName)
|
||||||
clientSession, serverSession := drpc.MemTransportPipe()
|
clientSession, serverSession := drpc.MemTransportPipe()
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -1185,7 +1185,7 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, name string
|
||||||
}()
|
}()
|
||||||
|
|
||||||
//nolint:gocritic // in-memory provisioners are owned by system
|
//nolint:gocritic // in-memory provisioners are owned by system
|
||||||
daemon, err := api.Database.UpsertProvisionerDaemon(dbauthz.AsSystemRestricted(ctx), database.UpsertProvisionerDaemonParams{
|
daemon, err := api.Database.UpsertProvisionerDaemon(dbauthz.AsSystemRestricted(dialCtx), database.UpsertProvisionerDaemonParams{
|
||||||
Name: name,
|
Name: name,
|
||||||
CreatedAt: dbtime.Now(),
|
CreatedAt: dbtime.Now(),
|
||||||
Provisioners: []database.ProvisionerType{
|
Provisioners: []database.ProvisionerType{
|
||||||
|
@ -1201,7 +1201,7 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, name string
|
||||||
}
|
}
|
||||||
|
|
||||||
mux := drpcmux.New()
|
mux := drpcmux.New()
|
||||||
api.Logger.Info(ctx, "starting in-memory provisioner daemon", slog.F("name", name))
|
api.Logger.Info(dialCtx, "starting in-memory provisioner daemon", slog.F("name", name))
|
||||||
logger := api.Logger.Named(fmt.Sprintf("inmem-provisionerd-%s", name))
|
logger := api.Logger.Named(fmt.Sprintf("inmem-provisionerd-%s", name))
|
||||||
srv, err := provisionerdserver.NewServer(
|
srv, err := provisionerdserver.NewServer(
|
||||||
api.ctx, // use the same ctx as the API
|
api.ctx, // use the same ctx as the API
|
||||||
|
@ -1238,13 +1238,25 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, name string
|
||||||
if xerrors.Is(err, io.EOF) {
|
if xerrors.Is(err, io.EOF) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
logger.Debug(ctx, "drpc server error", slog.Error(err))
|
logger.Debug(dialCtx, "drpc server error", slog.Error(err))
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
// in-mem pipes aren't technically "websockets" but they have the same properties as far as the
|
||||||
|
// API is concerned: they are long-lived connections that we need to close before completing
|
||||||
|
// shutdown of the API.
|
||||||
|
api.WebsocketWaitMutex.Lock()
|
||||||
|
api.WebsocketWaitGroup.Add(1)
|
||||||
|
api.WebsocketWaitMutex.Unlock()
|
||||||
go func() {
|
go func() {
|
||||||
err := server.Serve(ctx, serverSession)
|
defer api.WebsocketWaitGroup.Done()
|
||||||
logger.Info(ctx, "provisioner daemon disconnected", slog.Error(err))
|
// here we pass the background context, since we want the server to keep serving until the
|
||||||
|
// client hangs up. If we, say, pass the API context, then when it is canceled, we could
|
||||||
|
// drop a job that we locked in the database but never passed to the provisionerd. The
|
||||||
|
// provisionerd is local, in-mem, so there isn't a danger of losing contact with it and
|
||||||
|
// having a dead connection we don't know the status of.
|
||||||
|
err := server.Serve(context.Background(), serverSession)
|
||||||
|
logger.Info(dialCtx, "provisioner daemon disconnected", slog.Error(err))
|
||||||
// close the sessions, so we don't leak goroutines serving them.
|
// close the sessions, so we don't leak goroutines serving them.
|
||||||
_ = clientSession.Close()
|
_ = clientSession.Close()
|
||||||
_ = serverSession.Close()
|
_ = serverSession.Close()
|
||||||
|
|
|
@ -532,8 +532,8 @@ func NewProvisionerDaemon(t testing.TB, coderAPI *coderd.API) io.Closer {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
daemon := provisionerd.New(func(ctx context.Context) (provisionerdproto.DRPCProvisionerDaemonClient, error) {
|
daemon := provisionerd.New(func(dialCtx context.Context) (provisionerdproto.DRPCProvisionerDaemonClient, error) {
|
||||||
return coderAPI.CreateInMemoryProvisionerDaemon(ctx, "test")
|
return coderAPI.CreateInMemoryProvisionerDaemon(dialCtx, "test")
|
||||||
}, &provisionerd.Options{
|
}, &provisionerd.Options{
|
||||||
Logger: coderAPI.Logger.Named("provisionerd").Leveled(slog.LevelDebug),
|
Logger: coderAPI.Logger.Named("provisionerd").Leveled(slog.LevelDebug),
|
||||||
UpdateInterval: 250 * time.Millisecond,
|
UpdateInterval: 250 * time.Millisecond,
|
||||||
|
|
Loading…
Reference in New Issue