feat: provisionerd tracing, add public trace ingestion (#4070)

This commit is contained in:
Colin Adler 2022-09-16 11:43:22 -05:00 committed by GitHub
parent fc841898cd
commit 77acf0c340
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 380 additions and 168 deletions

View File

@ -34,7 +34,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/afero"
"github.com/spf13/cobra"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"golang.org/x/oauth2"
xgithub "golang.org/x/oauth2/github"
"golang.org/x/sync/errgroup"
@ -115,7 +115,7 @@ func Server(newAPI func(*coderd.Options) *coderd.API) *cobra.Command {
turnRelayAddress string
tunnel bool
stunServers []string
trace bool
traceEnable bool
secureAuthCookie bool
sshKeygenAlgorithmRaw string
autoImportTemplates []string
@ -159,26 +159,32 @@ func Server(newAPI func(*coderd.Options) *coderd.API) *cobra.Command {
defer http.DefaultClient.CloseIdleConnections()
var (
tracerProvider *sdktrace.TracerProvider
tracerProvider trace.TracerProvider
err error
sqlDriver = "postgres"
)
if trace {
tracerProvider, err = tracing.TracerProvider(ctx, "coderd")
if traceEnable || telemetryEnable {
sdkTracerProvider, err := tracing.TracerProvider(ctx, "coderd", tracing.TracerOpts{
Default: traceEnable,
Coder: telemetryEnable && !isTest(),
})
if err != nil {
logger.Warn(ctx, "failed to start telemetry exporter", slog.Error(err))
logger.Warn(ctx, "start telemetry exporter", slog.Error(err))
} else {
// allow time for traces to flush even if command context is canceled
defer func() {
_ = shutdownWithTimeout(tracerProvider, 5*time.Second)
_ = shutdownWithTimeout(sdkTracerProvider, 5*time.Second)
}()
d, err := tracing.PostgresDriver(tracerProvider, "coderd.database")
d, err := tracing.PostgresDriver(sdkTracerProvider, "coderd.database")
if err != nil {
logger.Warn(ctx, "failed to start postgres tracing driver", slog.Error(err))
logger.Warn(ctx, "start postgres tracing driver", slog.Error(err))
} else {
sqlDriver = d
}
tracerProvider = sdkTracerProvider
}
}
@ -838,7 +844,7 @@ func Server(newAPI func(*coderd.Options) *coderd.API) *cobra.Command {
cliflag.StringArrayVarP(root.Flags(), &stunServers, "stun-server", "", "CODER_STUN_SERVERS", []string{
"stun:stun.l.google.com:19302",
}, "Specify URLs for STUN servers to enable P2P connections.")
cliflag.BoolVarP(root.Flags(), &trace, "trace", "", "CODER_TRACE", false, "Specifies if application tracing data is collected")
cliflag.BoolVarP(root.Flags(), &traceEnable, "trace", "", "CODER_TRACE", false, "Specifies if application tracing data is collected")
cliflag.StringVarP(root.Flags(), &turnRelayAddress, "turn-relay-address", "", "CODER_TURN_RELAY_ADDRESS", "127.0.0.1",
"Specifies the address to bind TURN connections.")
cliflag.BoolVarP(root.Flags(), &secureAuthCookie, "secure-auth-cookie", "", "CODER_SECURE_AUTH_COOKIE", false, "Specifies if the 'Secure' property is set on browser session cookies")
@ -915,8 +921,13 @@ func shutdownWithTimeout(s interface{ Shutdown(context.Context) error }, timeout
}
// nolint:revive
func newProvisionerDaemon(ctx context.Context, coderAPI *coderd.API,
logger slog.Logger, cacheDir string, errCh chan error, dev bool,
func newProvisionerDaemon(
ctx context.Context,
coderAPI *coderd.API,
logger slog.Logger,
cacheDir string,
errCh chan error,
dev bool,
) (srv *provisionerd.Server, err error) {
ctx, cancel := context.WithCancel(ctx)
defer func() {
@ -989,6 +1000,7 @@ func newProvisionerDaemon(ctx context.Context, coderAPI *coderd.API,
UpdateInterval: 500 * time.Millisecond,
Provisioners: provisioners,
WorkDirectory: tempDir,
Tracer: coderAPI.TracerProvider,
}), nil
}

View File

@ -15,7 +15,7 @@ import (
"github.com/klauspost/compress/zstd"
"github.com/pion/webrtc/v3"
"github.com/prometheus/client_golang/prometheus"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"golang.org/x/xerrors"
"google.golang.org/api/idtoken"
"tailscale.com/derp"
@ -70,7 +70,7 @@ type Options struct {
SSHKeygenAlgorithm gitsshkey.Algorithm
Telemetry telemetry.Reporter
TURNServer *turnconn.Server
TracerProvider *sdktrace.TracerProvider
TracerProvider trace.TracerProvider
AutoImportTemplates []AutoImportTemplate
LicenseHandler http.Handler
FeaturesService features.Service

View File

@ -5,37 +5,74 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
"golang.org/x/xerrors"
)
// TracerOpts specifies which telemetry exporters should be configured.
type TracerOpts struct {
// Default exports to a backend configured by environment variables. See:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md
Default bool
// Coder exports traces to Coder's public tracing ingest service and is used
// to improve the product. It is disabled when opting out of telemetry.
Coder bool
}
// TracerProvider creates a grpc otlp exporter and configures a trace provider.
// Caller is responsible for calling TracerProvider.Shutdown to ensure all data is flushed.
func TracerProvider(ctx context.Context, service string) (*sdktrace.TracerProvider, error) {
res, err := resource.New(ctx,
resource.WithAttributes(
// the service name used to display traces in backends
semconv.ServiceNameKey.String(service),
),
func TracerProvider(ctx context.Context, service string, opts TracerOpts) (*sdktrace.TracerProvider, error) {
res := resource.NewWithAttributes(
semconv.SchemaURL,
// the service name used to display traces in backends
semconv.ServiceNameKey.String(service),
)
if err != nil {
return nil, xerrors.Errorf("creating otlp resource: %w", err)
}
// By default we send span data to a local otel collector.
// The endpoint we push to can be configured with env vars.
// See https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md
exporter, err := otlptrace.New(ctx, otlptracegrpc.NewClient(otlptracegrpc.WithInsecure()))
if err != nil {
return nil, xerrors.Errorf("creating otlp exporter: %w", err)
}
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
tracerOpts := []sdktrace.TracerProviderOption{
sdktrace.WithResource(res),
)
}
if opts.Default {
exporter, err := DefaultExporter(ctx)
if err != nil {
return nil, xerrors.Errorf("default exporter: %w", err)
}
tracerOpts = append(tracerOpts, sdktrace.WithBatcher(exporter))
}
if opts.Coder {
exporter, err := CoderExporter(ctx)
if err != nil {
return nil, xerrors.Errorf("coder exporter: %w", err)
}
tracerOpts = append(tracerOpts, sdktrace.WithBatcher(exporter))
}
tracerProvider := sdktrace.NewTracerProvider(tracerOpts...)
return tracerProvider, nil
}
func DefaultExporter(ctx context.Context) (*otlptrace.Exporter, error) {
exporter, err := otlptrace.New(ctx, otlptracegrpc.NewClient(otlptracegrpc.WithInsecure()))
if err != nil {
return nil, xerrors.Errorf("create otlp exporter: %w", err)
}
return exporter, nil
}
func CoderExporter(ctx context.Context) (*otlptrace.Exporter, error) {
opts := []otlptracehttp.Option{
otlptracehttp.WithEndpoint("oss-otel-ingest-http.coder.app:443"),
otlptracehttp.WithCompression(otlptracehttp.GzipCompression),
}
exporter, err := otlptrace.New(ctx, otlptracehttp.NewClient(opts...))
if err != nil {
return nil, xerrors.Errorf("create otlp exporter: %w", err)
}
return exporter, nil
}

View File

@ -5,7 +5,6 @@ import (
"net/http"
"github.com/go-chi/chi/v5"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
"go.opentelemetry.io/otel/trace"
@ -13,7 +12,7 @@ import (
)
// HTTPMW adds tracing to http routes.
func HTTPMW(tracerProvider *sdktrace.TracerProvider, name string) func(http.Handler) http.Handler {
func HTTPMW(tracerProvider trace.TracerProvider, name string) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
if tracerProvider == nil {
@ -34,20 +33,15 @@ func HTTPMW(tracerProvider *sdktrace.TracerProvider, name string) func(http.Hand
// pass the span through the request context and serve the request to the next middleware
next.ServeHTTP(sw, r)
// capture response data
EndHTTPSpan(r, sw.Status)
EndHTTPSpan(r, sw.Status, span)
})
}
}
// EndHTTPSpan captures request and response data after the handler is done.
func EndHTTPSpan(r *http.Request, status int) {
span := trace.SpanFromContext(r.Context())
func EndHTTPSpan(r *http.Request, status int, span trace.Span) {
// set the resource name as we get it only once the handler is executed
route := chi.RouteContext(r.Context()).RoutePattern()
if route != "" {
span.SetName(fmt.Sprintf("%s %s", r.Method, route))
}
span.SetName(fmt.Sprintf("%s %s", r.Method, route))
span.SetAttributes(semconv.NetAttributesFromHTTPRequest("tcp", r)...)
span.SetAttributes(semconv.EndUserAttributesFromHTTPRequest(r)...)

View File

@ -6,7 +6,7 @@ import (
"strings"
"github.com/nhatthm/otelsql"
semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
"go.opentelemetry.io/otel/trace"
"golang.org/x/xerrors"
)

19
coderd/tracing/util.go Normal file
View File

@ -0,0 +1,19 @@
package tracing
import (
"runtime"
"strings"
)
func FuncName() string {
fnpc, _, _, ok := runtime.Caller(1)
if !ok {
return ""
}
fn := runtime.FuncForPC(fnpc)
name := fn.Name()
if i := strings.LastIndex(name, "/"); i > 0 {
name = name[i+1:]
}
return name
}

View File

@ -0,0 +1,39 @@
package tracing_test
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/coder/coder/coderd/tracing"
)
// t.Parallel affects the result of these tests.
//nolint:paralleltest
func TestFuncName(t *testing.T) {
fn := tracing.FuncName()
assert.Equal(t, "tracing_test.TestFuncName", fn)
}
type foo struct{}
func (foo) bar() string {
return tracing.FuncName()
}
//nolint:paralleltest
func TestFuncNameMethod(t *testing.T) {
fn := foo{}.bar()
assert.Equal(t, "tracing_test.foo.bar", fn)
}
func (*foo) baz() string {
return tracing.FuncName()
}
//nolint:paralleltest
func TestFuncNameMethodPointer(t *testing.T) {
fn := (&foo{}).baz()
assert.Equal(t, "tracing_test.(*foo).baz", fn)
}

View File

@ -16,6 +16,7 @@ import (
"github.com/google/uuid"
"github.com/hashicorp/yamux"
"go.opentelemetry.io/otel/trace"
"golang.org/x/mod/semver"
"golang.org/x/xerrors"
"nhooyr.io/websocket"
@ -113,7 +114,7 @@ func (api *API) workspaceAgentDial(rw http.ResponseWriter, r *http.Request) {
}
// end span so we don't get long lived trace data
tracing.EndHTTPSpan(r, 200)
tracing.EndHTTPSpan(r, http.StatusOK, trace.SpanFromContext(ctx))
err = peerbroker.ProxyListen(ctx, session, peerbroker.ProxyOptions{
ChannelID: workspaceAgent.ID.String(),
@ -309,7 +310,7 @@ func (api *API) workspaceAgentListen(rw http.ResponseWriter, r *http.Request) {
}
// end span so we don't get long lived trace data
tracing.EndHTTPSpan(r, 200)
tracing.EndHTTPSpan(r, http.StatusOK, trace.SpanFromContext(ctx))
api.Logger.Info(ctx, "accepting agent", slog.F("resource", resource), slog.F("agent", workspaceAgent))
@ -398,8 +399,9 @@ func (api *API) workspaceAgentTurn(rw http.ResponseWriter, r *http.Request) {
}
ctx, wsNetConn := websocketNetConn(r.Context(), wsConn, websocket.MessageBinary)
defer wsNetConn.Close() // Also closes conn.
tracing.EndHTTPSpan(r, 200) // end span so we don't get long lived trace data
defer wsNetConn.Close() // Also closes conn.
// end span so we don't get long lived trace data
tracing.EndHTTPSpan(r, http.StatusOK, trace.SpanFromContext(ctx))
api.Logger.Debug(ctx, "accepting turn connection", slog.F("remote-address", r.RemoteAddr), slog.F("local-address", localAddress))
select {

View File

@ -9,6 +9,7 @@ import (
"strings"
"github.com/go-chi/chi/v5"
"go.opentelemetry.io/otel/trace"
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/httpapi"
@ -125,6 +126,7 @@ type proxyApplication struct {
}
func (api *API) proxyWorkspaceApplication(proxyApp proxyApplication, rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if !api.Authorize(r, rbac.ActionCreate, proxyApp.Workspace.ExecutionRBAC()) {
httpapi.ResourceNotFound(rw)
return
@ -138,7 +140,7 @@ func (api *API) proxyWorkspaceApplication(proxyApp proxyApplication, rw http.Res
// If the app name was used instead, fetch the app from the database so we
// can get the internal URL.
if proxyApp.AppName != "" {
app, err := api.Database.GetWorkspaceAppByAgentIDAndName(r.Context(), database.GetWorkspaceAppByAgentIDAndNameParams{
app, err := api.Database.GetWorkspaceAppByAgentIDAndName(ctx, database.GetWorkspaceAppByAgentIDAndNameParams{
AgentID: proxyApp.Agent.ID,
Name: proxyApp.AppName,
})
@ -195,7 +197,7 @@ func (api *API) proxyWorkspaceApplication(proxyApp proxyApplication, rw http.Res
if proxyApp.DashboardOnError {
// To pass friendly errors to the frontend, special meta tags are
// overridden in the index.html with the content passed here.
r = r.WithContext(site.WithAPIResponse(r.Context(), site.APIResponse{
r = r.WithContext(site.WithAPIResponse(ctx, site.APIResponse{
StatusCode: http.StatusBadGateway,
Message: err.Error(),
}))
@ -228,7 +230,7 @@ func (api *API) proxyWorkspaceApplication(proxyApp proxyApplication, rw http.Res
proxy.Transport = conn.HTTPTransport()
// end span so we don't get long lived trace data
tracing.EndHTTPSpan(r, 200)
tracing.EndHTTPSpan(r, http.StatusOK, trace.SpanFromContext(ctx))
proxy.ServeHTTP(rw, r)
}

15
go.mod
View File

@ -139,11 +139,12 @@ require (
github.com/u-root/u-root v0.9.0
github.com/unrolled/secure v1.13.0
go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1
go.opentelemetry.io/otel v1.9.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.9.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.9.0
go.opentelemetry.io/otel/sdk v1.9.0
go.opentelemetry.io/otel/trace v1.9.0
go.opentelemetry.io/otel v1.10.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.10.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.10.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.10.0
go.opentelemetry.io/otel/sdk v1.10.0
go.opentelemetry.io/otel/trace v1.10.0
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.2.0
golang.org/x/crypto v0.0.0-20220517005047-85d78b3ac167
@ -299,9 +300,9 @@ require (
github.com/zclconf/go-cty v1.10.0 // indirect
github.com/zeebo/errs v1.3.0 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.9.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.10.0 // indirect
go.opentelemetry.io/otel/metric v0.31.0 // indirect
go.opentelemetry.io/proto/otlp v0.18.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go4.org/mem v0.0.0-20210711025021-927187094b94 // indirect
go4.org/netipx v0.0.0-20220725152314-7e7bdc8411bf
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect

30
go.sum
View File

@ -1933,22 +1933,24 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.32.0/go.mod h1:
go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo=
go.opentelemetry.io/otel v1.3.0/go.mod h1:PWIKzi6JCp7sM0k9yZ43VX+T345uNbAkDKwHVjb2PTs=
go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk=
go.opentelemetry.io/otel v1.9.0 h1:8WZNQFIB2a71LnANS9JeyidJKKGOOremcUtb/OtHISw=
go.opentelemetry.io/otel v1.9.0/go.mod h1:np4EoPGzoPs3O67xUVNoPPcmSvsfOxNlNA4F4AC+0Eo=
go.opentelemetry.io/otel v1.10.0 h1:Y7DTJMR6zs1xkS/upamJYk0SxxN4C9AqRd77jmZnyY4=
go.opentelemetry.io/otel v1.10.0/go.mod h1:NbvWjCthWHKBEUMpf0/v8ZRZlni86PpGFEMA9pnQSnQ=
go.opentelemetry.io/otel/exporters/otlp v0.20.0/go.mod h1:YIieizyaN77rtLJra0buKiNBOm9XQfkPEKBeuhoMwAM=
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.3.0/go.mod h1:VpP4/RMn8bv8gNo9uK7/IMY4mtWLELsS+JIP0inH0h4=
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0/go.mod h1:M1hVZHNxcbkAlcvrOMlpQ4YOO3Awf+4N2dxkZL3xm04=
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.9.0 h1:ggqApEjDKczicksfvZUCxuvoyDmR6Sbm56LwiK8DVR0=
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.9.0/go.mod h1:78XhIg8Ht9vR4tbLNUhXsiOnE2HOuSeKAiAcoVQEpOY=
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.10.0 h1:TaB+1rQhddO1sF71MpZOZAuSPW1klK2M8XxfrBMfK7Y=
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.10.0/go.mod h1:78XhIg8Ht9vR4tbLNUhXsiOnE2HOuSeKAiAcoVQEpOY=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.3.0/go.mod h1:hO1KLR7jcKaDDKDkvI9dP/FIhpmna5lkqPUQdEjFAM8=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.7.0/go.mod h1:ceUgdyfNv4h4gLxHR0WNfDiiVmZFodZhZSbOLhpxqXE=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.9.0 h1:NN90Cuna0CnBg8YNu1Q0V35i2E8LDByFOwHRCq/ZP9I=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.9.0/go.mod h1:0EsCXjZAiiZGnLdEUXM9YjCKuuLZMYyglh2QDXcYKVA=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.10.0 h1:pDDYmo0QadUPal5fwXoY1pmMpFcdyhXOmL5drCrI3vU=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.10.0/go.mod h1:Krqnjl22jUJ0HgMzw5eveuCvFDXY4nSYb4F8t5gdrag=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.3.0/go.mod h1:keUU7UfnwWTWpJ+FWnyqmogPa82nuU5VUANFq49hlMY=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.7.0/go.mod h1:E+/KKhwOSw8yoPxSSuUHG6vKppkvhN+S1Jc7Nib3k3o=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.9.0 h1:M0/hqGuJBLeIEu20f89H74RGtqV2dn+SFWEz9ATAAwY=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.9.0/go.mod h1:K5G92gbtCrYJ0mn6zj9Pst7YFsDFuvSYEhYKRMcufnM=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.10.0 h1:KtiUEhQmj/Pa874bVYKGNVdq8NPKiacPbaRRtgXi+t4=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.10.0/go.mod h1:OfUCyyIiDvNXHWpcWgbF+MWvqPZiNa3YDEnivcnYsV0=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.3.0/go.mod h1:QNX1aly8ehqqX1LEa6YniTU7VY9I6R3X/oPxhGdTceE=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.10.0 h1:S8DedULB3gp93Rh+9Z+7NTEv+6Id/KYS7LDyipZ9iCE=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.10.0/go.mod h1:5WV40MLWwvWlGP7Xm8g3pMcg0pKOUY609qxJn8y7LmM=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.31.0 h1:fu/wxbXqjgIRZYzQNrF175qtwrJx+oQSFhZpTIbNQLc=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.8.0 h1:FVy7BZCjoA2Nk+fHqIdoTmm554J9wTX+YcrDp+mc368=
go.opentelemetry.io/otel/metric v0.20.0/go.mod h1:598I5tYlH1vzBjn+BTuhzTCSb/9debfNp6R3s7Pr1eU=
@ -1959,21 +1961,21 @@ go.opentelemetry.io/otel/oteltest v0.20.0/go.mod h1:L7bgKf9ZB7qCwT9Up7i9/pn0PWIa
go.opentelemetry.io/otel/sdk v0.20.0/go.mod h1:g/IcepuwNsoiX5Byy2nNV0ySUF1em498m7hBWC279Yc=
go.opentelemetry.io/otel/sdk v1.3.0/go.mod h1:rIo4suHNhQwBIPg9axF8V9CA72Wz2mKF1teNrup8yzs=
go.opentelemetry.io/otel/sdk v1.7.0/go.mod h1:uTEOTwaqIVuTGiJN7ii13Ibp75wJmYUDe374q6cZwUU=
go.opentelemetry.io/otel/sdk v1.9.0 h1:LNXp1vrr83fNXTHgU8eO89mhzxb/bbWAsHG6fNf3qWo=
go.opentelemetry.io/otel/sdk v1.9.0/go.mod h1:AEZc8nt5bd2F7BC24J5R0mrjYnpEgYHyTcM/vrSple4=
go.opentelemetry.io/otel/sdk v1.10.0 h1:jZ6K7sVn04kk/3DNUdJ4mqRlGDiXAVuIG+MMENpTNdY=
go.opentelemetry.io/otel/sdk v1.10.0/go.mod h1:vO06iKzD5baltJz1zarxMCNHFpUlUiOy4s65ECtn6kE=
go.opentelemetry.io/otel/sdk/export/metric v0.20.0/go.mod h1:h7RBNMsDJ5pmI1zExLi+bJK+Dr8NQCh0qGhm1KDnNlE=
go.opentelemetry.io/otel/sdk/metric v0.20.0/go.mod h1:knxiS8Xd4E/N+ZqKmUPf3gTTZ4/0TjTXukfxjzSTpHE=
go.opentelemetry.io/otel/sdk/metric v0.31.0 h1:2sZx4R43ZMhJdteKAlKoHvRgrMp53V1aRxvEf5lCq8Q=
go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw=
go.opentelemetry.io/otel/trace v1.3.0/go.mod h1:c/VDhno8888bvQYmbYLqe41/Ldmr/KKunbvWM4/fEjk=
go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48yyE4TNvoHqU=
go.opentelemetry.io/otel/trace v1.9.0 h1:oZaCNJUjWcg60VXWee8lJKlqhPbXAPB51URuR47pQYc=
go.opentelemetry.io/otel/trace v1.9.0/go.mod h1:2737Q0MuG8q1uILYm2YYVkAyLtOofiTNGg6VODnOiPo=
go.opentelemetry.io/otel/trace v1.10.0 h1:npQMbR8o7mum8uF95yFbOEJffhs1sbCOfDh8zAJiH5E=
go.opentelemetry.io/otel/trace v1.10.0/go.mod h1:Sij3YYczqAdz+EhmGhE6TpTxUO5/F/AzrK+kxfGqySM=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.opentelemetry.io/proto/otlp v0.11.0/go.mod h1:QpEjXPrNQzrFDZgoTo49dgHR9RYRSrg3NAKnUGl9YpQ=
go.opentelemetry.io/proto/otlp v0.16.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.opentelemetry.io/proto/otlp v0.18.0 h1:W5hyXNComRa23tGpKwG+FRAc4rfF6ZUg1JReK+QHS80=
go.opentelemetry.io/proto/otlp v0.18.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw=
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=

View File

@ -5,17 +5,21 @@ import (
"errors"
"fmt"
"io"
"reflect"
"strings"
"sync"
"time"
"github.com/hashicorp/yamux"
"github.com/spf13/afero"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"golang.org/x/xerrors"
"cdr.dev/slog"
"github.com/coder/coder/coderd/tracing"
"github.com/coder/coder/provisionerd/proto"
"github.com/coder/coder/provisionerd/runner"
sdkproto "github.com/coder/coder/provisionersdk/proto"
@ -39,6 +43,7 @@ type Provisioners map[string]sdkproto.DRPCProvisionerClient
type Options struct {
Filesystem afero.Fs
Logger slog.Logger
Tracer trace.TracerProvider
ForceCancelInterval time.Duration
UpdateInterval time.Duration
@ -61,10 +66,16 @@ func New(clientDialer Dialer, opts *Options) *Server {
if opts.Filesystem == nil {
opts.Filesystem = afero.NewOsFs()
}
if opts.Tracer == nil {
opts.Tracer = trace.NewNoopTracerProvider()
}
ctx, ctxCancel := context.WithCancel(context.Background())
daemon := &Server{
opts: opts,
tracer: opts.Tracer.Tracer("provisionerd"),
clientDialer: clientDialer,
opts: opts,
closeContext: ctx,
closeCancel: ctxCancel,
@ -77,7 +88,8 @@ func New(clientDialer Dialer, opts *Options) *Server {
}
type Server struct {
opts *Options
opts *Options
tracer trace.Tracer
clientDialer Dialer
clientValue atomic.Value
@ -196,11 +208,13 @@ func (p *Server) acquireJob(ctx context.Context) {
p.opts.Logger.Debug(context.Background(), "skipping acquire; provisionerd is shutting down...")
return
}
var err error
client, ok := p.client()
if !ok {
return
}
job, err := client.AcquireJob(ctx, &proto.Empty{})
if err != nil {
if errors.Is(err, context.Canceled) {
@ -209,13 +223,37 @@ func (p *Server) acquireJob(ctx context.Context) {
if errors.Is(err, yamux.ErrSessionShutdown) {
return
}
p.opts.Logger.Warn(context.Background(), "acquire job", slog.Error(err))
p.opts.Logger.Warn(ctx, "acquire job", slog.Error(err))
return
}
if job.JobId == "" {
return
}
p.opts.Logger.Info(context.Background(), "acquired job",
ctx, span := p.tracer.Start(ctx, tracing.FuncName(), trace.WithAttributes(
semconv.ServiceNameKey.String("coderd.provisionerd"),
attribute.String("job_id", job.JobId),
attribute.String("job_type", reflect.TypeOf(job.GetType()).Elem().Name()),
attribute.Int64("job_created_at", job.CreatedAt),
attribute.String("initiator_username", job.UserName),
attribute.String("provisioner", job.Provisioner),
attribute.Int("template_size_bytes", len(job.TemplateSourceArchive)),
))
defer span.End()
if build := job.GetWorkspaceBuild(); build != nil {
span.SetAttributes(
attribute.String("workspace_build_id", build.WorkspaceBuildId),
attribute.String("workspace_id", build.Metadata.WorkspaceId),
attribute.String("workspace_name", build.WorkspaceName),
attribute.String("workspace_owner_id", build.Metadata.WorkspaceOwnerId),
attribute.String("workspace_owner", build.Metadata.WorkspaceOwner),
attribute.String("workspace_transition", build.Metadata.WorkspaceTransition.String()),
)
}
p.opts.Logger.Info(ctx, "acquired job",
slog.F("initiator_username", job.UserName),
slog.F("provisioner", job.Provisioner),
slog.F("job_id", job.JobId),
@ -228,13 +266,24 @@ func (p *Server) acquireJob(ctx context.Context) {
Error: fmt.Sprintf("no provisioner %s", job.Provisioner),
})
if err != nil {
p.opts.Logger.Error(context.Background(), "failed to call FailJob",
slog.F("job_id", job.JobId), slog.Error(err))
p.opts.Logger.Error(ctx, "fail job", slog.F("job_id", job.JobId), slog.Error(err))
}
return
}
p.activeJob = runner.NewRunner(job, p, p.opts.Logger, p.opts.Filesystem, p.opts.WorkDirectory, provisioner,
p.opts.UpdateInterval, p.opts.ForceCancelInterval)
p.activeJob = runner.NewRunner(
ctx,
job,
p,
p.opts.Logger,
p.opts.Filesystem,
p.opts.WorkDirectory,
provisioner,
p.opts.UpdateInterval,
p.opts.ForceCancelInterval,
p.tracer,
)
go p.activeJob.Run()
}

View File

@ -17,10 +17,13 @@ import (
"github.com/google/uuid"
"github.com/spf13/afero"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
"go.opentelemetry.io/otel/trace"
"golang.org/x/xerrors"
"cdr.dev/slog"
"github.com/coder/coder/coderd/tracing"
"github.com/coder/coder/provisionerd/proto"
sdkproto "github.com/coder/coder/provisionersdk/proto"
)
@ -30,6 +33,7 @@ const (
)
type Runner struct {
tracer trace.Tracer
job *proto.AcquiredJob
sender JobUpdater
logger slog.Logger
@ -40,7 +44,7 @@ type Runner struct {
forceCancelInterval time.Duration
// closed when the Runner is finished sending any updates/failed/complete.
done chan any
done chan struct{}
// active as long as we are not canceled
notCanceled context.Context
cancel context.CancelFunc
@ -68,6 +72,7 @@ type JobUpdater interface {
}
func NewRunner(
ctx context.Context,
job *proto.AcquiredJob,
updater JobUpdater,
logger slog.Logger,
@ -75,18 +80,20 @@ func NewRunner(
workDirectory string,
provisioner sdkproto.DRPCProvisionerClient,
updateInterval time.Duration,
forceCancelInterval time.Duration) *Runner {
forceCancelInterval time.Duration,
tracer trace.Tracer,
) *Runner {
m := new(sync.Mutex)
// we need to create our contexts here in case a call to Cancel() comes immediately.
logCtx := slog.With(context.Background(), slog.F("job_id", job.JobId))
forceStopContext, forceStopFunc := context.WithCancel(logCtx)
forceStopContext, forceStopFunc := context.WithCancel(ctx)
gracefulContext, cancelFunc := context.WithCancel(forceStopContext)
return &Runner{
tracer: tracer,
job: job,
sender: updater,
logger: logger,
logger: logger.With(slog.F("job_id", job.JobId)),
filesystem: filesystem,
workDirectory: workDirectory,
provisioner: provisioner,
@ -94,7 +101,7 @@ func NewRunner(
forceCancelInterval: forceCancelInterval,
mutex: m,
cond: sync.NewCond(m),
done: make(chan any),
done: make(chan struct{}),
okToSend: true,
notStopped: forceStopContext,
stop: forceStopFunc,
@ -103,7 +110,7 @@ func NewRunner(
}
}
// Run the job.
// Run executes the job.
//
// the idea here is to run two goroutines to work on the job: doCleanFinish and heartbeat, then use
// the `r.cond` to wait until the job is either complete or failed. This function then sends the
@ -113,12 +120,15 @@ func NewRunner(
// that goroutine on the context passed into Fail(), and it marks okToSend false to signal us here
// that this function should not also send a terminal message.
func (r *Runner) Run() {
ctx, span := r.startTrace(r.notStopped, tracing.FuncName())
defer span.End()
r.mutex.Lock()
defer r.mutex.Unlock()
defer r.stop()
go r.doCleanFinish()
go r.heartbeat()
go r.doCleanFinish(ctx)
go r.heartbeat(ctx)
for r.failedJob == nil && r.completedJob == nil {
r.cond.Wait()
}
@ -127,19 +137,24 @@ func (r *Runner) Run() {
return
}
if r.failedJob != nil {
r.logger.Debug(r.notStopped, "sending FailedJob")
err := r.sender.FailJob(r.notStopped, r.failedJob)
span.RecordError(xerrors.New(r.failedJob.Error))
span.SetStatus(codes.Error, r.failedJob.Error)
r.logger.Debug(ctx, "sending FailedJob")
err := r.sender.FailJob(ctx, r.failedJob)
if err != nil {
r.logger.Error(r.notStopped, "send FailJob", slog.Error(err))
r.logger.Error(ctx, "send FailJob", slog.Error(err))
} else {
r.logger.Info(ctx, "sent FailedJob")
}
r.logger.Info(r.notStopped, "sent FailedJob")
} else {
r.logger.Debug(r.notStopped, "sending CompletedJob")
err := r.sender.CompleteJob(r.notStopped, r.completedJob)
r.logger.Debug(ctx, "sending CompletedJob")
err := r.sender.CompleteJob(ctx, r.completedJob)
if err != nil {
r.logger.Error(r.notStopped, "send CompletedJob", slog.Error(err))
r.logger.Error(ctx, "send CompletedJob", slog.Error(err))
} else {
r.logger.Info(ctx, "sent CompletedJob")
}
r.logger.Info(r.notStopped, "sent CompletedJob")
}
close(r.done)
r.okToSend = false
@ -151,13 +166,13 @@ func (r *Runner) Cancel() {
r.cancel()
}
func (r *Runner) Done() <-chan any {
func (r *Runner) Done() <-chan struct{} {
return r.done
}
// Fail immediately halts updates and, if the job is not complete sends FailJob to the coder server. Running goroutines
// Fail immediately halts updates and, if the job is not complete sends FailJob to the coder server. Running goroutines
// are canceled but complete asynchronously (although they are prevented from further updating the job to the coder
// server). The provided context sets how long to keep trying to send the FailJob.
// server). The provided context sets how long to keep trying to send the FailJob.
func (r *Runner) Fail(ctx context.Context, f *proto.FailedJob) error {
f.JobId = r.job.JobId
r.mutex.Lock()
@ -227,12 +242,21 @@ func (r *Runner) update(ctx context.Context, u *proto.UpdateJobRequest) (*proto.
}
// doCleanFinish wraps a call to do() with cleaning up the job and setting the terminal messages
func (r *Runner) doCleanFinish() {
// push the fail/succeed write onto the defer stack before the cleanup, so that cleanup happens
// before this.
var failedJob *proto.FailedJob
var completedJob *proto.CompletedJob
func (r *Runner) doCleanFinish(ctx context.Context) {
var (
failedJob *proto.FailedJob
completedJob *proto.CompletedJob
)
ctx, span := r.startTrace(ctx, tracing.FuncName())
defer span.End()
// push the fail/succeed write onto the defer stack before the cleanup, so
// that cleanup happens before this.
defer func() {
_, span := r.startTrace(ctx, tracing.FuncName())
defer span.End()
if failedJob != nil {
r.setFail(failedJob)
return
@ -241,17 +265,20 @@ func (r *Runner) doCleanFinish() {
}()
defer func() {
_, err := r.update(r.notStopped, &proto.UpdateJobRequest{
ctx, span := r.startTrace(ctx, tracing.FuncName())
defer span.End()
_, err := r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: []*proto.Log{{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_INFO,
Stage: "Cleaning Up",
CreatedAt: time.Now().UTC().UnixMilli(),
CreatedAt: time.Now().UnixMilli(),
}},
})
if err != nil {
r.logger.Warn(r.notStopped, "failed to log cleanup")
r.logger.Warn(ctx, "failed to log cleanup")
return
}
@ -263,47 +290,52 @@ func (r *Runner) doCleanFinish() {
// When the provisioner daemon is shutting down,
// it may take a few milliseconds for processes to exit.
// See: https://github.com/golang/go/issues/50510
r.logger.Debug(r.notStopped, "failed to clean work directory; trying again", slog.Error(err))
r.logger.Debug(ctx, "failed to clean work directory; trying again", slog.Error(err))
time.Sleep(250 * time.Millisecond)
continue
}
r.logger.Debug(r.notStopped, "cleaned up work directory", slog.Error(err))
r.logger.Debug(ctx, "cleaned up work directory")
break
}
}()
completedJob, failedJob = r.do()
completedJob, failedJob = r.do(ctx)
}
// do actually does the work of running the job
func (r *Runner) do() (*proto.CompletedJob, *proto.FailedJob) {
func (r *Runner) do(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob) {
ctx, span := r.startTrace(ctx, tracing.FuncName())
defer span.End()
err := r.filesystem.MkdirAll(r.workDirectory, 0700)
if err != nil {
return nil, r.failedJobf("create work directory %q: %s", r.workDirectory, err)
}
_, err = r.update(r.notStopped, &proto.UpdateJobRequest{
_, err = r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: []*proto.Log{{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_INFO,
Stage: "Setting up",
CreatedAt: time.Now().UTC().UnixMilli(),
CreatedAt: time.Now().UnixMilli(),
}},
})
if err != nil {
return nil, r.failedJobf("write log: %s", err)
}
r.logger.Info(r.notStopped, "unpacking template source archive",
slog.F("size_bytes", len(r.job.TemplateSourceArchive)))
r.logger.Info(ctx, "unpacking template source archive",
slog.F("size_bytes", len(r.job.TemplateSourceArchive)),
)
reader := tar.NewReader(bytes.NewBuffer(r.job.TemplateSourceArchive))
for {
header, err := reader.Next()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, r.failedJobf("read template source archive: %s", err)
}
// #nosec
@ -352,24 +384,24 @@ func (r *Runner) do() (*proto.CompletedJob, *proto.FailedJob) {
case *proto.AcquiredJob_TemplateImport_:
r.logger.Debug(context.Background(), "acquired job is template import")
failedJob := r.runReadmeParse()
failedJob := r.runReadmeParse(ctx)
if failedJob != nil {
return nil, failedJob
}
return r.runTemplateImport()
return r.runTemplateImport(ctx)
case *proto.AcquiredJob_TemplateDryRun_:
r.logger.Debug(context.Background(), "acquired job is template dry-run",
slog.F("workspace_name", jobType.TemplateDryRun.Metadata.WorkspaceName),
slog.F("parameters", jobType.TemplateDryRun.ParameterValues),
)
return r.runTemplateDryRun()
return r.runTemplateDryRun(ctx)
case *proto.AcquiredJob_WorkspaceBuild_:
r.logger.Debug(context.Background(), "acquired job is workspace provision",
slog.F("workspace_name", jobType.WorkspaceBuild.WorkspaceName),
slog.F("state_length", len(jobType.WorkspaceBuild.State)),
slog.F("parameters", jobType.WorkspaceBuild.ParameterValues),
)
return r.runWorkspaceBuild()
return r.runWorkspaceBuild(ctx)
default:
return nil, r.failedJobf("unknown job type %q; ensure your provisioner daemon is up-to-date",
reflect.TypeOf(r.job.Type).String())
@ -378,9 +410,10 @@ func (r *Runner) do() (*proto.CompletedJob, *proto.FailedJob) {
// heartbeat periodically sends updates on the job, which keeps coder server from assuming the job
// is stalled, and allows the runner to learn if the job has been canceled by the user.
func (r *Runner) heartbeat() {
func (r *Runner) heartbeat(ctx context.Context) {
ticker := time.NewTicker(r.updateInterval)
defer ticker.Stop()
for {
select {
case <-r.notCanceled.Done():
@ -388,29 +421,29 @@ func (r *Runner) heartbeat() {
case <-ticker.C:
}
resp, err := r.update(r.notStopped, &proto.UpdateJobRequest{
resp, err := r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
})
if err != nil {
err = r.Fail(r.notStopped, r.failedJobf("send periodic update: %s", err))
err = r.Fail(ctx, r.failedJobf("send periodic update: %s", err))
if err != nil {
r.logger.Error(r.notStopped, "failed to call FailJob", slog.Error(err))
r.logger.Error(ctx, "failed to call FailJob", slog.Error(err))
}
return
}
if !resp.Canceled {
continue
}
r.logger.Info(r.notStopped, "attempting graceful cancelation")
r.logger.Info(ctx, "attempting graceful cancelation")
r.Cancel()
// Hard-cancel the job after a minute of pending cancelation.
timer := time.NewTimer(r.forceCancelInterval)
select {
case <-timer.C:
r.logger.Warn(r.notStopped, "Cancel timed out")
err := r.Fail(r.notStopped, r.failedJobf("Cancel timed out"))
r.logger.Warn(ctx, "Cancel timed out")
err := r.Fail(ctx, r.failedJobf("Cancel timed out"))
if err != nil {
r.logger.Warn(r.notStopped, "failed to call FailJob", slog.Error(err))
r.logger.Warn(ctx, "failed to call FailJob", slog.Error(err))
}
return
case <-r.Done():
@ -427,16 +460,19 @@ func (r *Runner) heartbeat() {
// versions.
const ReadmeFile = "README.md"
func (r *Runner) runReadmeParse() *proto.FailedJob {
func (r *Runner) runReadmeParse(ctx context.Context) *proto.FailedJob {
ctx, span := r.startTrace(ctx, tracing.FuncName())
defer span.End()
fi, err := afero.ReadFile(r.filesystem, path.Join(r.workDirectory, ReadmeFile))
if err != nil {
_, err := r.update(r.notStopped, &proto.UpdateJobRequest{
_, err := r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: []*proto.Log{{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_DEBUG,
Stage: "No README.md provided",
CreatedAt: time.Now().UTC().UnixMilli(),
CreatedAt: time.Now().UnixMilli(),
}},
})
if err != nil {
@ -446,13 +482,13 @@ func (r *Runner) runReadmeParse() *proto.FailedJob {
return nil
}
_, err = r.update(r.notStopped, &proto.UpdateJobRequest{
_, err = r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: []*proto.Log{{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_INFO,
Stage: "Adding README.md...",
CreatedAt: time.Now().UTC().UnixMilli(),
CreatedAt: time.Now().UnixMilli(),
}},
Readme: fi,
})
@ -462,25 +498,28 @@ func (r *Runner) runReadmeParse() *proto.FailedJob {
return nil
}
func (r *Runner) runTemplateImport() (*proto.CompletedJob, *proto.FailedJob) {
func (r *Runner) runTemplateImport(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob) {
ctx, span := r.startTrace(ctx, tracing.FuncName())
defer span.End()
// Parse parameters and update the job with the parameter specs
_, err := r.update(r.notStopped, &proto.UpdateJobRequest{
_, err := r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: []*proto.Log{{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_INFO,
Stage: "Parsing template parameters",
CreatedAt: time.Now().UTC().UnixMilli(),
CreatedAt: time.Now().UnixMilli(),
}},
})
if err != nil {
return nil, r.failedJobf("write log: %s", err)
}
parameterSchemas, err := r.runTemplateImportParse()
parameterSchemas, err := r.runTemplateImportParse(ctx)
if err != nil {
return nil, r.failedJobf("run parse: %s", err)
}
updateResponse, err := r.update(r.notStopped, &proto.UpdateJobRequest{
updateResponse, err := r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
ParameterSchemas: parameterSchemas,
})
@ -500,19 +539,19 @@ func (r *Runner) runTemplateImport() (*proto.CompletedJob, *proto.FailedJob) {
}
// Determine persistent resources
_, err = r.update(r.notStopped, &proto.UpdateJobRequest{
_, err = r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: []*proto.Log{{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_INFO,
Stage: "Detecting persistent resources",
CreatedAt: time.Now().UTC().UnixMilli(),
CreatedAt: time.Now().UnixMilli(),
}},
})
if err != nil {
return nil, r.failedJobf("write log: %s", err)
}
startResources, err := r.runTemplateImportProvision(updateResponse.ParameterValues, &sdkproto.Provision_Metadata{
startResources, err := r.runTemplateImportProvision(ctx, updateResponse.ParameterValues, &sdkproto.Provision_Metadata{
CoderUrl: r.job.GetTemplateImport().Metadata.CoderUrl,
WorkspaceTransition: sdkproto.WorkspaceTransition_START,
})
@ -521,19 +560,19 @@ func (r *Runner) runTemplateImport() (*proto.CompletedJob, *proto.FailedJob) {
}
// Determine ephemeral resources.
_, err = r.update(r.notStopped, &proto.UpdateJobRequest{
_, err = r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: []*proto.Log{{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_INFO,
Stage: "Detecting ephemeral resources",
CreatedAt: time.Now().UTC().UnixMilli(),
CreatedAt: time.Now().UnixMilli(),
}},
})
if err != nil {
return nil, r.failedJobf("write log: %s", err)
}
stopResources, err := r.runTemplateImportProvision(updateResponse.ParameterValues, &sdkproto.Provision_Metadata{
stopResources, err := r.runTemplateImportProvision(ctx, updateResponse.ParameterValues, &sdkproto.Provision_Metadata{
CoderUrl: r.job.GetTemplateImport().Metadata.CoderUrl,
WorkspaceTransition: sdkproto.WorkspaceTransition_STOP,
})
@ -553,8 +592,11 @@ func (r *Runner) runTemplateImport() (*proto.CompletedJob, *proto.FailedJob) {
}
// Parses parameter schemas from source.
func (r *Runner) runTemplateImportParse() ([]*sdkproto.ParameterSchema, error) {
stream, err := r.provisioner.Parse(r.notStopped, &sdkproto.Parse_Request{
func (r *Runner) runTemplateImportParse(ctx context.Context) ([]*sdkproto.ParameterSchema, error) {
ctx, span := r.startTrace(ctx, tracing.FuncName())
defer span.End()
stream, err := r.provisioner.Parse(ctx, &sdkproto.Parse_Request{
Directory: r.workDirectory,
})
if err != nil {
@ -573,12 +615,12 @@ func (r *Runner) runTemplateImportParse() ([]*sdkproto.ParameterSchema, error) {
slog.F("output", msgType.Log.Output),
)
_, err = r.update(r.notStopped, &proto.UpdateJobRequest{
_, err = r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: []*proto.Log{{
Source: proto.LogSource_PROVISIONER,
Level: msgType.Log.Level,
CreatedAt: time.Now().UTC().UnixMilli(),
CreatedAt: time.Now().UnixMilli(),
Output: msgType.Log.Output,
Stage: "Parse parameters",
}},
@ -601,7 +643,10 @@ func (r *Runner) runTemplateImportParse() ([]*sdkproto.ParameterSchema, error) {
// Performs a dry-run provision when importing a template.
// This is used to detect resources that would be provisioned
// for a workspace in various states.
func (r *Runner) runTemplateImportProvision(values []*sdkproto.ParameterValue, metadata *sdkproto.Provision_Metadata) ([]*sdkproto.Resource, error) {
func (r *Runner) runTemplateImportProvision(ctx context.Context, values []*sdkproto.ParameterValue, metadata *sdkproto.Provision_Metadata) ([]*sdkproto.Resource, error) {
ctx, span := r.startTrace(ctx, tracing.FuncName())
defer span.End()
var stage string
switch metadata.WorkspaceTransition {
case sdkproto.WorkspaceTransition_START:
@ -611,7 +656,7 @@ func (r *Runner) runTemplateImportProvision(values []*sdkproto.ParameterValue, m
}
// use the notStopped so that if we attempt to gracefully cancel, the stream will still be available for us
// to send the cancel to the provisioner
stream, err := r.provisioner.Provision(r.notStopped)
stream, err := r.provisioner.Provision(ctx)
if err != nil {
return nil, xerrors.Errorf("provision: %w", err)
}
@ -653,12 +698,12 @@ func (r *Runner) runTemplateImportProvision(values []*sdkproto.ParameterValue, m
slog.F("level", msgType.Log.Level),
slog.F("output", msgType.Log.Output),
)
_, err = r.update(r.notStopped, &proto.UpdateJobRequest{
_, err = r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: []*proto.Log{{
Source: proto.LogSource_PROVISIONER,
Level: msgType.Log.Level,
CreatedAt: time.Now().UTC().UnixMilli(),
CreatedAt: time.Now().UnixMilli(),
Output: msgType.Log.Output,
Stage: stage,
}},
@ -689,8 +734,10 @@ func (r *Runner) runTemplateImportProvision(values []*sdkproto.ParameterValue, m
}
}
func (r *Runner) runTemplateDryRun() (
*proto.CompletedJob, *proto.FailedJob) {
func (r *Runner) runTemplateDryRun(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob) {
ctx, span := r.startTrace(ctx, tracing.FuncName())
defer span.End()
// Ensure all metadata fields are set as they are all optional for dry-run.
metadata := r.job.GetTemplateDryRun().GetMetadata()
metadata.WorkspaceTransition = sdkproto.WorkspaceTransition_START
@ -720,7 +767,7 @@ func (r *Runner) runTemplateDryRun() (
}
// Run the template import provision task since it's already a dry run.
resources, err := r.runTemplateImportProvision(
resources, err := r.runTemplateImportProvision(ctx,
r.job.GetTemplateDryRun().GetParameterValues(),
metadata,
)
@ -738,8 +785,10 @@ func (r *Runner) runTemplateDryRun() (
}, nil
}
func (r *Runner) runWorkspaceBuild() (
*proto.CompletedJob, *proto.FailedJob) {
func (r *Runner) runWorkspaceBuild(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob) {
ctx, span := r.startTrace(ctx, tracing.FuncName())
defer span.End()
var stage string
switch r.job.GetWorkspaceBuild().Metadata.WorkspaceTransition {
case sdkproto.WorkspaceTransition_START:
@ -750,13 +799,13 @@ func (r *Runner) runWorkspaceBuild() (
stage = "Destroying workspace"
}
_, err := r.update(r.notStopped, &proto.UpdateJobRequest{
_, err := r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: []*proto.Log{{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_INFO,
Stage: stage,
CreatedAt: time.Now().UTC().UnixMilli(),
CreatedAt: time.Now().UnixMilli(),
}},
})
if err != nil {
@ -765,7 +814,7 @@ func (r *Runner) runWorkspaceBuild() (
// use the notStopped so that if we attempt to gracefully cancel, the stream will still be available for us
// to send the cancel to the provisioner
stream, err := r.provisioner.Provision(r.notStopped)
stream, err := r.provisioner.Provision(ctx)
if err != nil {
return nil, r.failedJobf("provision: %s", err)
}
@ -809,12 +858,12 @@ func (r *Runner) runWorkspaceBuild() (
slog.F("workspace_build_id", r.job.GetWorkspaceBuild().WorkspaceBuildId),
)
_, err = r.update(r.notStopped, &proto.UpdateJobRequest{
_, err = r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: []*proto.Log{{
Source: proto.LogSource_PROVISIONER,
Level: msgType.Log.Level,
CreatedAt: time.Now().UTC().UnixMilli(),
CreatedAt: time.Now().UnixMilli(),
Output: msgType.Log.Output,
Stage: stage,
}},
@ -867,3 +916,9 @@ func (r *Runner) failedJobf(format string, args ...interface{}) *proto.FailedJob
Error: fmt.Sprintf(format, args...),
}
}
func (r *Runner) startTrace(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return r.tracer.Start(ctx, name, append(opts, trace.WithAttributes(
semconv.ServiceNameKey.String("coderd.provisionerd"),
))...)
}