feat: Add buffering to provisioner job logs (#4918)

* feat: Add bufferring to provisioner job logs

This should improve overall build performance, and especially under load.

It removes the old `id` column on the `provisioner_job_logs` table
and replaces it with an auto-incrementing big integer to preserve order.

Funny enough, we never had to care about order before because inserts
would at minimum be 1ms different. Now they aren't, so the order needs
to be preserved.

* Fix log bufferring

* Fix frontend log streaming

* Fix JS test
This commit is contained in:
Kyle Carberry 2022-11-06 18:50:34 -08:00 committed by GitHub
parent 531f7cd489
commit 30281852d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 300 additions and 476 deletions

View File

@ -16,14 +16,14 @@ import (
"github.com/coder/coder/codersdk"
)
func WorkspaceBuild(ctx context.Context, writer io.Writer, client *codersdk.Client, build uuid.UUID, before time.Time) error {
func WorkspaceBuild(ctx context.Context, writer io.Writer, client *codersdk.Client, build uuid.UUID) error {
return ProvisionerJob(ctx, writer, ProvisionerJobOptions{
Fetch: func() (codersdk.ProvisionerJob, error) {
build, err := client.WorkspaceBuild(ctx, build)
return build.Job, err
},
Logs: func() (<-chan codersdk.ProvisionerJobLog, io.Closer, error) {
return client.WorkspaceBuildLogsAfter(ctx, build, before)
return client.WorkspaceBuildLogsAfter(ctx, build, 0)
},
})
}

View File

@ -139,7 +139,6 @@ func create() *cobra.Command {
return err
}
after := time.Now()
workspace, err := client.CreateWorkspace(cmd.Context(), organization.ID, codersdk.Me, codersdk.CreateWorkspaceRequest{
TemplateID: template.ID,
Name: workspaceName,
@ -151,7 +150,7 @@ func create() *cobra.Command {
return err
}
err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, workspace.LatestBuild.ID, after)
err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, workspace.LatestBuild.ID)
if err != nil {
return err
}
@ -238,7 +237,6 @@ PromptParamLoop:
_, _ = fmt.Fprintln(cmd.OutOrStdout())
// Run a dry-run with the given parameters to check correctness
after := time.Now()
dryRun, err := client.CreateTemplateVersionDryRun(cmd.Context(), templateVersion.ID, codersdk.CreateTemplateVersionDryRunRequest{
WorkspaceName: args.NewWorkspaceName,
ParameterValues: parameters,
@ -255,7 +253,7 @@ PromptParamLoop:
return client.CancelTemplateVersionDryRun(cmd.Context(), templateVersion.ID, dryRun.ID)
},
Logs: func() (<-chan codersdk.ProvisionerJobLog, io.Closer, error) {
return client.TemplateVersionDryRunLogsAfter(cmd.Context(), templateVersion.ID, dryRun.ID, after)
return client.TemplateVersionDryRunLogsAfter(cmd.Context(), templateVersion.ID, dryRun.ID, 0)
},
// Don't show log output for the dry-run unless there's an error.
Silent: true,

View File

@ -47,7 +47,6 @@ func deleteWorkspace() *cobra.Command {
)
}
before := time.Now()
build, err := client.CreateWorkspaceBuild(cmd.Context(), workspace.ID, codersdk.CreateWorkspaceBuildRequest{
Transition: codersdk.WorkspaceTransitionDelete,
ProvisionerState: state,
@ -57,7 +56,7 @@ func deleteWorkspace() *cobra.Command {
return err
}
err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID, before)
err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID)
if err != nil {
return err
}

View File

@ -79,7 +79,7 @@ func portForward() *cobra.Command {
return xerrors.New("workspace must be in start transition to port-forward")
}
if workspace.LatestBuild.Job.CompletedAt == nil {
err = cliui.WorkspaceBuild(ctx, cmd.ErrOrStderr(), client, workspace.LatestBuild.ID, workspace.CreatedAt)
err = cliui.WorkspaceBuild(ctx, cmd.ErrOrStderr(), client, workspace.LatestBuild.ID)
if err != nil {
return err
}

View File

@ -250,7 +250,7 @@ func getWorkspaceAndAgent(ctx context.Context, cmd *cobra.Command, client *coder
return codersdk.Workspace{}, codersdk.WorkspaceAgent{}, xerrors.New("workspace must be in start transition to ssh")
}
if workspace.LatestBuild.Job.CompletedAt == nil {
err := cliui.WorkspaceBuild(ctx, cmd.ErrOrStderr(), client, workspace.LatestBuild.ID, workspace.CreatedAt)
err := cliui.WorkspaceBuild(ctx, cmd.ErrOrStderr(), client, workspace.LatestBuild.ID)
if err != nil {
return codersdk.Workspace{}, codersdk.WorkspaceAgent{}, err
}

View File

@ -25,7 +25,6 @@ func start() *cobra.Command {
if err != nil {
return err
}
before := time.Now()
build, err := client.CreateWorkspaceBuild(cmd.Context(), workspace.ID, codersdk.CreateWorkspaceBuildRequest{
Transition: codersdk.WorkspaceTransitionStart,
})
@ -33,7 +32,7 @@ func start() *cobra.Command {
return err
}
err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID, before)
err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID)
if err != nil {
return err
}

View File

@ -5,7 +5,6 @@ import (
"io"
"os"
"strconv"
"time"
"github.com/spf13/cobra"
@ -100,7 +99,6 @@ func statePush() *cobra.Command {
return err
}
before := time.Now()
build, err = client.CreateWorkspaceBuild(cmd.Context(), workspace.ID, codersdk.CreateWorkspaceBuildRequest{
TemplateVersionID: build.TemplateVersionID,
Transition: build.Transition,
@ -109,7 +107,7 @@ func statePush() *cobra.Command {
if err != nil {
return err
}
return cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStderr(), client, build.ID, before)
return cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStderr(), client, build.ID)
},
}
cmd.Flags().IntVarP(&buildNumber, "build", "b", 0, "Specify a workspace build to target by name.")

View File

@ -33,7 +33,6 @@ func stop() *cobra.Command {
if err != nil {
return err
}
before := time.Now()
build, err := client.CreateWorkspaceBuild(cmd.Context(), workspace.ID, codersdk.CreateWorkspaceBuildRequest{
Transition: codersdk.WorkspaceTransitionStop,
})
@ -41,7 +40,7 @@ func stop() *cobra.Command {
return err
}
err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID, before)
err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID)
if err != nil {
return err
}

View File

@ -160,7 +160,6 @@ type createValidTemplateVersionArgs struct {
}
func createValidTemplateVersion(cmd *cobra.Command, args createValidTemplateVersionArgs, parameters ...codersdk.CreateParameterRequest) (*codersdk.TemplateVersion, []codersdk.CreateParameterRequest, error) {
before := time.Now()
client := args.Client
req := codersdk.CreateTemplateVersionRequest{
@ -187,7 +186,7 @@ func createValidTemplateVersion(cmd *cobra.Command, args createValidTemplateVers
return client.CancelTemplateVersion(cmd.Context(), version.ID)
},
Logs: func() (<-chan codersdk.ProvisionerJobLog, io.Closer, error) {
return client.TemplateVersionLogsAfter(cmd.Context(), version.ID, before)
return client.TemplateVersionLogsAfter(cmd.Context(), version.ID, 0)
},
})
if err != nil {

View File

@ -2,7 +2,6 @@ package cli
import (
"fmt"
"time"
"github.com/spf13/cobra"
@ -57,7 +56,6 @@ func update() *cobra.Command {
return nil
}
before := time.Now()
build, err := client.CreateWorkspaceBuild(cmd.Context(), workspace.ID, codersdk.CreateWorkspaceBuildRequest{
TemplateVersionID: template.ActiveVersionID,
Transition: workspace.LatestBuild.Transition,
@ -66,7 +64,7 @@ func update() *cobra.Command {
if err != nil {
return err
}
logs, closer, err := client.WorkspaceBuildLogsAfter(cmd.Context(), build.ID, before)
logs, closer, err := client.WorkspaceBuildLogsAfter(cmd.Context(), build.ID, 0)
if err != nil {
return err
}

View File

@ -2052,17 +2052,14 @@ func (q *fakeQuerier) GetProvisionerLogsByIDBetween(_ context.Context, arg datab
if jobLog.JobID != arg.JobID {
continue
}
if !arg.CreatedBefore.IsZero() && jobLog.CreatedAt.After(arg.CreatedBefore) {
if arg.CreatedBefore != 0 && jobLog.ID > arg.CreatedBefore {
continue
}
if !arg.CreatedAfter.IsZero() && jobLog.CreatedAt.Before(arg.CreatedAfter) {
if arg.CreatedAfter != 0 && jobLog.ID < arg.CreatedAfter {
continue
}
logs = append(logs, jobLog)
}
if len(logs) == 0 {
return nil, sql.ErrNoRows
}
return logs, nil
}
@ -2212,10 +2209,15 @@ func (q *fakeQuerier) InsertProvisionerJobLogs(_ context.Context, arg database.I
defer q.mutex.Unlock()
logs := make([]database.ProvisionerJobLog, 0)
id := int64(1)
if len(q.provisionerJobLogs) > 0 {
id = q.provisionerJobLogs[len(q.provisionerJobLogs)-1].ID
}
for index, output := range arg.Output {
id++
logs = append(logs, database.ProvisionerJobLog{
ID: id,
JobID: arg.JobID,
ID: arg.ID[index],
CreatedAt: arg.CreatedAt[index],
Source: arg.Source[index],
Level: arg.Level[index],

View File

@ -272,15 +272,24 @@ CREATE TABLE provisioner_daemons (
);
CREATE TABLE provisioner_job_logs (
id uuid NOT NULL,
job_id uuid NOT NULL,
created_at timestamp with time zone NOT NULL,
source log_source NOT NULL,
level log_level NOT NULL,
stage character varying(128) NOT NULL,
output character varying(1024) NOT NULL
output character varying(1024) NOT NULL,
id bigint NOT NULL
);
CREATE SEQUENCE provisioner_job_logs_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
ALTER SEQUENCE provisioner_job_logs_id_seq OWNED BY provisioner_job_logs.id;
CREATE TABLE provisioner_jobs (
id uuid NOT NULL,
created_at timestamp with time zone NOT NULL,
@ -463,6 +472,8 @@ CREATE TABLE workspaces (
ALTER TABLE ONLY licenses ALTER COLUMN id SET DEFAULT nextval('licenses_id_seq'::regclass);
ALTER TABLE ONLY provisioner_job_logs ALTER COLUMN id SET DEFAULT nextval('provisioner_job_logs_id_seq'::regclass);
ALTER TABLE ONLY agent_stats
ADD CONSTRAINT agent_stats_pkey PRIMARY KEY (id);

View File

@ -0,0 +1,3 @@
ALTER TABLE provisioner_job_logs DROP COLUMN id;
ALTER TABLE provisioner_job_logs ADD COLUMN id uuid NOT NULL DEFAULT gen_random_uuid();

View File

@ -0,0 +1,3 @@
ALTER TABLE provisioner_job_logs DROP COLUMN id;
ALTER TABLE provisioner_job_logs ADD COLUMN id BIGSERIAL PRIMARY KEY;

View File

@ -545,13 +545,13 @@ type ProvisionerJob struct {
}
type ProvisionerJobLog struct {
ID uuid.UUID `db:"id" json:"id"`
JobID uuid.UUID `db:"job_id" json:"job_id"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
Source LogSource `db:"source" json:"source"`
Level LogLevel `db:"level" json:"level"`
Stage string `db:"stage" json:"stage"`
Output string `db:"output" json:"output"`
ID int64 `db:"id" json:"id"`
}
type Replica struct {

View File

@ -2345,23 +2345,21 @@ func (q *sqlQuerier) UpdateProvisionerDaemonByID(ctx context.Context, arg Update
const getProvisionerLogsByIDBetween = `-- name: GetProvisionerLogsByIDBetween :many
SELECT
id, job_id, created_at, source, level, stage, output
job_id, created_at, source, level, stage, output, id
FROM
provisioner_job_logs
WHERE
job_id = $1
AND (
created_at >= $2
OR created_at <= $3
)
ORDER BY
created_at DESC
id > $2
OR id < $3
) ORDER BY id
`
type GetProvisionerLogsByIDBetweenParams struct {
JobID uuid.UUID `db:"job_id" json:"job_id"`
CreatedAfter time.Time `db:"created_after" json:"created_after"`
CreatedBefore time.Time `db:"created_before" json:"created_before"`
CreatedAfter int64 `db:"created_after" json:"created_after"`
CreatedBefore int64 `db:"created_before" json:"created_before"`
}
func (q *sqlQuerier) GetProvisionerLogsByIDBetween(ctx context.Context, arg GetProvisionerLogsByIDBetweenParams) ([]ProvisionerJobLog, error) {
@ -2374,13 +2372,13 @@ func (q *sqlQuerier) GetProvisionerLogsByIDBetween(ctx context.Context, arg GetP
for rows.Next() {
var i ProvisionerJobLog
if err := rows.Scan(
&i.ID,
&i.JobID,
&i.CreatedAt,
&i.Source,
&i.Level,
&i.Stage,
&i.Output,
&i.ID,
); err != nil {
return nil, err
}
@ -2399,17 +2397,15 @@ const insertProvisionerJobLogs = `-- name: InsertProvisionerJobLogs :many
INSERT INTO
provisioner_job_logs
SELECT
unnest($1 :: uuid [ ]) AS id,
$2 :: uuid AS job_id,
unnest($3 :: timestamptz [ ]) AS created_at,
unnest($4 :: log_source [ ]) AS source,
unnest($5 :: log_level [ ]) AS LEVEL,
unnest($6 :: VARCHAR(128) [ ]) AS stage,
unnest($7 :: VARCHAR(1024) [ ]) AS output RETURNING id, job_id, created_at, source, level, stage, output
$1 :: uuid AS job_id,
unnest($2 :: timestamptz [ ]) AS created_at,
unnest($3 :: log_source [ ]) AS source,
unnest($4 :: log_level [ ]) AS LEVEL,
unnest($5 :: VARCHAR(128) [ ]) AS stage,
unnest($6 :: VARCHAR(1024) [ ]) AS output RETURNING job_id, created_at, source, level, stage, output, id
`
type InsertProvisionerJobLogsParams struct {
ID []uuid.UUID `db:"id" json:"id"`
JobID uuid.UUID `db:"job_id" json:"job_id"`
CreatedAt []time.Time `db:"created_at" json:"created_at"`
Source []LogSource `db:"source" json:"source"`
@ -2420,7 +2416,6 @@ type InsertProvisionerJobLogsParams struct {
func (q *sqlQuerier) InsertProvisionerJobLogs(ctx context.Context, arg InsertProvisionerJobLogsParams) ([]ProvisionerJobLog, error) {
rows, err := q.db.QueryContext(ctx, insertProvisionerJobLogs,
pq.Array(arg.ID),
arg.JobID,
pq.Array(arg.CreatedAt),
pq.Array(arg.Source),
@ -2436,13 +2431,13 @@ func (q *sqlQuerier) InsertProvisionerJobLogs(ctx context.Context, arg InsertPro
for rows.Next() {
var i ProvisionerJobLog
if err := rows.Scan(
&i.ID,
&i.JobID,
&i.CreatedAt,
&i.Source,
&i.Level,
&i.Stage,
&i.Output,
&i.ID,
); err != nil {
return nil, err
}

View File

@ -6,17 +6,14 @@ FROM
WHERE
job_id = @job_id
AND (
created_at >= @created_after
OR created_at <= @created_before
)
ORDER BY
created_at DESC;
id > @created_after
OR id < @created_before
) ORDER BY id;
-- name: InsertProvisionerJobLogs :many
INSERT INTO
provisioner_job_logs
SELECT
unnest(@id :: uuid [ ]) AS id,
@job_id :: uuid AS job_id,
unnest(@created_at :: timestamptz [ ]) AS created_at,
unnest(@source :: log_source [ ]) AS source,

View File

@ -368,7 +368,6 @@ func (server *provisionerdServer) UpdateJob(ctx context.Context, request *proto.
if err != nil {
return nil, xerrors.Errorf("convert log source: %w", err)
}
insertParams.ID = append(insertParams.ID, uuid.New())
insertParams.CreatedAt = append(insertParams.CreatedAt, time.UnixMilli(log.CreatedAt))
insertParams.Level = append(insertParams.Level, logLevel)
insertParams.Stage = append(insertParams.Stage, log.Stage)
@ -384,10 +383,15 @@ func (server *provisionerdServer) UpdateJob(ctx context.Context, request *proto.
server.Logger.Error(ctx, "failed to insert job logs", slog.F("job_id", parsedID), slog.Error(err))
return nil, xerrors.Errorf("insert job logs: %w", err)
}
// Publish by the lowest log ID inserted so the
// log stream will fetch everything from that point.
lowestID := logs[0].ID
server.Logger.Debug(ctx, "inserted job logs", slog.F("job_id", parsedID))
data, err := json.Marshal(provisionerJobLogsMessage{Logs: logs})
data, err := json.Marshal(provisionerJobLogsMessage{
CreatedAfter: lowestID,
})
if err != nil {
return nil, xerrors.Errorf("marshal job log: %w", err)
return nil, xerrors.Errorf("marshal: %w", err)
}
err = server.Pubsub.Publish(provisionerJobLogsChannel(parsedID), data)
if err != nil {

View File

@ -24,8 +24,8 @@ import (
// Returns provisioner logs based on query parameters.
// The intended usage for a client to stream all logs (with JS API):
// const timestamp = new Date().getTime();
// 1. GET /logs?before=<timestamp>
// 2. GET /logs?after=<timestamp>&follow
// 1. GET /logs?before=<id>
// 2. GET /logs?after=<id>&follow
// The combination of these responses should provide all current logs
// to the consumer, and future logs are streamed in the follow request.
func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job database.ProvisionerJob) {
@ -74,10 +74,11 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
}
}
var after time.Time
var after int64
// Only fetch logs created after the time provided.
if afterRaw != "" {
afterMS, err := strconv.ParseInt(afterRaw, 10, 64)
var err error
after, err = strconv.ParseInt(afterRaw, 10, 64)
if err != nil {
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
Message: "Query param \"after\" must be an integer.",
@ -87,16 +88,12 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
})
return
}
after = time.UnixMilli(afterMS)
} else {
if follow {
after = database.Now()
}
}
var before time.Time
var before int64
// Only fetch logs created before the time provided.
if beforeRaw != "" {
beforeMS, err := strconv.ParseInt(beforeRaw, 10, 64)
var err error
before, err = strconv.ParseInt(beforeRaw, 10, 64)
if err != nil {
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
Message: "Query param \"before\" must be an integer.",
@ -106,12 +103,6 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
})
return
}
before = time.UnixMilli(beforeMS)
} else {
// If we're following, we don't want logs before a timestamp!
if !follow {
before = database.Now()
}
}
logs, err := api.Database.GetProvisionerLogsByIDBetween(ctx, database.GetProvisionerLogsByIDBetweenParams{
@ -156,7 +147,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
ctx, wsNetConn := websocketNetConn(ctx, conn, websocket.MessageText)
defer wsNetConn.Close() // Also closes conn.
logIdsDone := make(map[uuid.UUID]bool)
logIdsDone := make(map[int64]bool)
// The Go stdlib JSON encoder appends a newline character after message write.
encoder := json.NewEncoder(wsNetConn)
@ -370,8 +361,8 @@ func provisionerJobLogsChannel(jobID uuid.UUID) string {
// provisionerJobLogsMessage is the message type published on the provisionerJobLogsChannel() channel
type provisionerJobLogsMessage struct {
EndOfLogs bool `json:"end_of_logs,omitempty"`
Logs []database.ProvisionerJobLog `json:"logs,omitempty"`
CreatedAfter int64 `json:"created_after"`
EndOfLogs bool `json:"end_of_logs,omitempty"`
}
func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog, func(), error) {
@ -389,23 +380,32 @@ func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog,
return
default:
}
jlMsg := provisionerJobLogsMessage{}
err := json.Unmarshal(message, &jlMsg)
if err != nil {
logger.Warn(ctx, "invalid provisioner job log on channel", slog.Error(err))
return
}
if jlMsg.CreatedAfter != 0 {
logs, err := api.Database.GetProvisionerLogsByIDBetween(ctx, database.GetProvisionerLogsByIDBetweenParams{
JobID: jobID,
CreatedAfter: jlMsg.CreatedAfter,
})
if err != nil {
logger.Warn(ctx, "get provisioner logs", slog.Error(err))
return
}
for _, log := range jlMsg.Logs {
select {
case bufferedLogs <- log:
logger.Debug(ctx, "subscribe buffered log", slog.F("stage", log.Stage))
default:
// If this overflows users could miss logs streaming. This can happen
// we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub,
// so just drop them.
logger.Warn(ctx, "provisioner job log overflowing channel")
for _, log := range logs {
select {
case bufferedLogs <- log:
logger.Debug(ctx, "subscribe buffered log", slog.F("stage", log.Stage))
default:
// If this overflows users could miss logs streaming. This can happen
// we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub,
// so just drop them.
logger.Warn(ctx, "provisioner job log overflowing channel")
}
}
}
if jlMsg.EndOfLogs {

View File

@ -1,160 +1,15 @@
package coderd
import (
"context"
"crypto/sha256"
"database/sql"
"encoding/json"
"net/http/httptest"
"net/url"
"sync"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/database/databasefake"
"github.com/coder/coder/coderd/rbac"
"github.com/coder/coder/codersdk"
"github.com/coder/coder/testutil"
)
func TestProvisionerJobLogs_Unit(t *testing.T) {
t.Parallel()
t.Run("QueryPubSubDupes", func(t *testing.T) {
t.Parallel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
// mDB := mocks.NewStore(t)
fDB := databasefake.New()
fPubsub := &fakePubSub{t: t, cond: sync.NewCond(&sync.Mutex{})}
opts := Options{
Logger: logger,
Database: fDB,
Pubsub: fPubsub,
}
api := New(&opts)
defer api.Close()
server := httptest.NewServer(api.RootHandler)
defer server.Close()
userID := uuid.New()
keyID, keySecret, err := generateAPIKeyIDSecret()
require.NoError(t, err)
hashed := sha256.Sum256([]byte(keySecret))
u, err := url.Parse(server.URL)
require.NoError(t, err)
client := codersdk.Client{
HTTPClient: server.Client(),
SessionToken: keyID + "-" + keySecret,
URL: u,
}
buildID := uuid.New()
workspaceID := uuid.New()
jobID := uuid.New()
expectedLogs := []database.ProvisionerJobLog{
{ID: uuid.New(), JobID: jobID, Stage: "Stage0"},
{ID: uuid.New(), JobID: jobID, Stage: "Stage1"},
{ID: uuid.New(), JobID: jobID, Stage: "Stage2"},
{ID: uuid.New(), JobID: jobID, Stage: "Stage3"},
}
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
// wow there are a lot of DB rows we touch...
_, err = fDB.InsertAPIKey(ctx, database.InsertAPIKeyParams{
ID: keyID,
HashedSecret: hashed[:],
UserID: userID,
ExpiresAt: time.Now().Add(5 * time.Hour),
LoginType: database.LoginTypePassword,
Scope: database.APIKeyScopeAll,
})
require.NoError(t, err)
_, err = fDB.InsertUser(ctx, database.InsertUserParams{
ID: userID,
RBACRoles: []string{rbac.RoleOwner()},
})
require.NoError(t, err)
_, err = fDB.InsertWorkspaceBuild(ctx, database.InsertWorkspaceBuildParams{
ID: buildID,
WorkspaceID: workspaceID,
JobID: jobID,
})
require.NoError(t, err)
_, err = fDB.InsertWorkspace(ctx, database.InsertWorkspaceParams{
ID: workspaceID,
})
require.NoError(t, err)
_, err = fDB.InsertProvisionerJob(ctx, database.InsertProvisionerJobParams{
ID: jobID,
})
require.NoError(t, err)
for _, l := range expectedLogs[:2] {
_, err := fDB.InsertProvisionerJobLogs(ctx, database.InsertProvisionerJobLogsParams{
ID: []uuid.UUID{l.ID},
JobID: jobID,
Stage: []string{l.Stage},
})
require.NoError(t, err)
}
logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, buildID, time.Now())
require.NoError(t, err)
defer closer.Close()
// when the endpoint calls subscribe, we get the listener here.
fPubsub.cond.L.Lock()
for fPubsub.listener == nil {
fPubsub.cond.Wait()
}
// endpoint should now be listening
assert.False(t, fPubsub.canceled)
assert.False(t, fPubsub.closed)
// send all the logs in two batches, duplicating what we already returned on the DB query.
msg := provisionerJobLogsMessage{}
msg.Logs = expectedLogs[:2]
data, err := json.Marshal(msg)
require.NoError(t, err)
fPubsub.listener(ctx, data)
msg.Logs = expectedLogs[2:]
data, err = json.Marshal(msg)
require.NoError(t, err)
fPubsub.listener(ctx, data)
// send end of logs
msg.Logs = nil
msg.EndOfLogs = true
data, err = json.Marshal(msg)
require.NoError(t, err)
fPubsub.listener(ctx, data)
var stages []string
for l := range logs {
logger.Info(ctx, "got log",
slog.F("id", l.ID),
slog.F("stage", l.Stage))
stages = append(stages, l.Stage)
}
assert.Equal(t, []string{"Stage0", "Stage1", "Stage2", "Stage3"}, stages)
for !fPubsub.canceled {
fPubsub.cond.Wait()
}
assert.False(t, fPubsub.closed)
})
}
func TestConvertProvisionerJob_Unit(t *testing.T) {
t.Parallel()
validNullTimeMock := sql.NullTime{
@ -260,39 +115,3 @@ func TestConvertProvisionerJob_Unit(t *testing.T) {
})
}
}
type fakePubSub struct {
t *testing.T
cond *sync.Cond
listener database.Listener
canceled bool
closed bool
}
func (f *fakePubSub) Subscribe(_ string, listener database.Listener) (cancel func(), err error) {
f.cond.L.Lock()
defer f.cond.L.Unlock()
f.listener = listener
f.cond.Signal()
return f.cancel, nil
}
func (f *fakePubSub) Publish(_ string, _ []byte) error {
f.t.Fail()
return nil
}
func (f *fakePubSub) Close() error {
f.cond.L.Lock()
defer f.cond.L.Unlock()
f.closed = true
f.cond.Signal()
return nil
}
func (f *fakePubSub) cancel() {
f.cond.L.Lock()
defer f.cond.L.Unlock()
f.canceled = true
f.cond.Signal()
}

View File

@ -3,12 +3,10 @@ package coderd_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/coder/coder/coderd/coderdtest"
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/provisioner/echo"
"github.com/coder/coder/provisionersdk/proto"
"github.com/coder/coder/testutil"
@ -38,13 +36,12 @@ func TestProvisionerJobLogs(t *testing.T) {
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)
workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID)
before := time.Now().UTC()
coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, before)
logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, 0)
require.NoError(t, err)
defer closer.Close()
for {
@ -78,12 +75,11 @@ func TestProvisionerJobLogs(t *testing.T) {
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)
workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID)
before := database.Now()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, before)
logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, 0)
require.NoError(t, err)
defer closer.Close()
for {
@ -121,7 +117,7 @@ func TestProvisionerJobLogs(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
logs, err := client.WorkspaceBuildLogsBefore(ctx, workspace.LatestBuild.ID, time.Now())
logs, err := client.WorkspaceBuildLogsBefore(ctx, workspace.LatestBuild.ID, 0)
require.NoError(t, err)
require.Greater(t, len(logs), 1)
})

View File

@ -4,7 +4,6 @@ import (
"context"
"net/http"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
@ -430,7 +429,6 @@ func TestTemplateVersionLogs(t *testing.T) {
t.Parallel()
client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerDaemon: true})
user := coderdtest.CreateFirstUser(t, client)
before := time.Now()
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
Parse: echo.ParseComplete,
ProvisionDryRun: echo.ProvisionComplete,
@ -465,7 +463,7 @@ func TestTemplateVersionLogs(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
logs, closer, err := client.TemplateVersionLogsAfter(ctx, version.ID, before)
logs, closer, err := client.TemplateVersionLogsAfter(ctx, version.ID, 0)
require.NoError(t, err)
defer closer.Close()
for {
@ -625,7 +623,6 @@ func TestTemplateVersionDryRun(t *testing.T) {
defer cancel()
// Create template version dry-run
after := time.Now()
job, err := client.CreateTemplateVersionDryRun(ctx, version.ID, codersdk.CreateTemplateVersionDryRunRequest{
ParameterValues: []codersdk.CreateParameterRequest{},
})
@ -637,7 +634,7 @@ func TestTemplateVersionDryRun(t *testing.T) {
require.Equal(t, job.ID, newJob.ID)
// Stream logs
logs, closer, err := client.TemplateVersionDryRunLogsAfter(ctx, version.ID, job.ID, after)
logs, closer, err := client.TemplateVersionDryRunLogsAfter(ctx, version.ID, job.ID, 0)
require.NoError(t, err)
defer closer.Close()

View File

@ -452,7 +452,6 @@ func TestWorkspaceBuildLogs(t *testing.T) {
t.Parallel()
client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerDaemon: true})
user := coderdtest.CreateFirstUser(t, client)
before := time.Now()
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
Parse: echo.ParseComplete,
Provision: []*proto.Provision_Response{{
@ -487,7 +486,7 @@ func TestWorkspaceBuildLogs(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, before.Add(-time.Hour))
logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, 0)
require.NoError(t, err)
defer closer.Close()
for {

View File

@ -76,7 +76,7 @@ type ProvisionerJob struct {
}
type ProvisionerJobLog struct {
ID uuid.UUID `json:"id"`
ID int64 `json:"id"`
CreatedAt time.Time `json:"created_at"`
Source LogSource `json:"log_source"`
Level LogLevel `json:"log_level"`
@ -87,10 +87,10 @@ type ProvisionerJobLog struct {
// provisionerJobLogsBefore provides log output that occurred before a time.
// This is abstracted from a specific job type to provide consistency between
// APIs. Logs is the only shared route between jobs.
func (c *Client) provisionerJobLogsBefore(ctx context.Context, path string, before time.Time) ([]ProvisionerJobLog, error) {
func (c *Client) provisionerJobLogsBefore(ctx context.Context, path string, before int64) ([]ProvisionerJobLog, error) {
values := url.Values{}
if !before.IsZero() {
values["before"] = []string{strconv.FormatInt(before.UTC().UnixMilli(), 10)}
if before != 0 {
values["before"] = []string{strconv.FormatInt(before, 10)}
}
res, err := c.Request(ctx, http.MethodGet, fmt.Sprintf("%s?%s", path, values.Encode()), nil)
if err != nil {
@ -106,10 +106,10 @@ func (c *Client) provisionerJobLogsBefore(ctx context.Context, path string, befo
}
// provisionerJobLogsAfter streams logs that occurred after a specific time.
func (c *Client) provisionerJobLogsAfter(ctx context.Context, path string, after time.Time) (<-chan ProvisionerJobLog, io.Closer, error) {
func (c *Client) provisionerJobLogsAfter(ctx context.Context, path string, after int64) (<-chan ProvisionerJobLog, io.Closer, error) {
afterQuery := ""
if !after.IsZero() {
afterQuery = fmt.Sprintf("&after=%d", after.UTC().UnixMilli())
if after != 0 {
afterQuery = fmt.Sprintf("&after=%d", after)
}
followURL, err := c.URL.Parse(fmt.Sprintf("%s?follow%s", path, afterQuery))
if err != nil {

View File

@ -93,13 +93,13 @@ func (c *Client) TemplateVersionResources(ctx context.Context, version uuid.UUID
return resources, json.NewDecoder(res.Body).Decode(&resources)
}
// TemplateVersionLogsBefore returns logs that occurred before a specific time.
func (c *Client) TemplateVersionLogsBefore(ctx context.Context, version uuid.UUID, before time.Time) ([]ProvisionerJobLog, error) {
// TemplateVersionLogsBefore returns logs that occurred before a specific log ID.
func (c *Client) TemplateVersionLogsBefore(ctx context.Context, version uuid.UUID, before int64) ([]ProvisionerJobLog, error) {
return c.provisionerJobLogsBefore(ctx, fmt.Sprintf("/api/v2/templateversions/%s/logs", version), before)
}
// TemplateVersionLogsAfter streams logs for a template version that occurred after a specific time.
func (c *Client) TemplateVersionLogsAfter(ctx context.Context, version uuid.UUID, after time.Time) (<-chan ProvisionerJobLog, io.Closer, error) {
// TemplateVersionLogsAfter streams logs for a template version that occurred after a specific log ID.
func (c *Client) TemplateVersionLogsAfter(ctx context.Context, version uuid.UUID, after int64) (<-chan ProvisionerJobLog, io.Closer, error) {
return c.provisionerJobLogsAfter(ctx, fmt.Sprintf("/api/v2/templateversions/%s/logs", version), after)
}
@ -159,14 +159,14 @@ func (c *Client) TemplateVersionDryRunResources(ctx context.Context, version, jo
}
// TemplateVersionDryRunLogsBefore returns logs for a template version dry-run
// that occurred before a specific time.
func (c *Client) TemplateVersionDryRunLogsBefore(ctx context.Context, version, job uuid.UUID, before time.Time) ([]ProvisionerJobLog, error) {
// that occurred before a specific log ID.
func (c *Client) TemplateVersionDryRunLogsBefore(ctx context.Context, version, job uuid.UUID, before int64) ([]ProvisionerJobLog, error) {
return c.provisionerJobLogsBefore(ctx, fmt.Sprintf("/api/v2/templateversions/%s/dry-run/%s/logs", version, job), before)
}
// TemplateVersionDryRunLogsAfter streams logs for a template version dry-run
// that occurred after a specific time.
func (c *Client) TemplateVersionDryRunLogsAfter(ctx context.Context, version, job uuid.UUID, after time.Time) (<-chan ProvisionerJobLog, io.Closer, error) {
// that occurred after a specific log ID.
func (c *Client) TemplateVersionDryRunLogsAfter(ctx context.Context, version, job uuid.UUID, after int64) (<-chan ProvisionerJobLog, io.Closer, error) {
return c.provisionerJobLogsAfter(ctx, fmt.Sprintf("/api/v2/templateversions/%s/dry-run/%s/logs", version, job), after)
}

View File

@ -117,13 +117,13 @@ func (c *Client) CancelWorkspaceBuild(ctx context.Context, id uuid.UUID) error {
return nil
}
// WorkspaceBuildLogsBefore returns logs that occurred before a specific time.
func (c *Client) WorkspaceBuildLogsBefore(ctx context.Context, build uuid.UUID, before time.Time) ([]ProvisionerJobLog, error) {
// WorkspaceBuildLogsBefore returns logs that occurred before a specific log ID.
func (c *Client) WorkspaceBuildLogsBefore(ctx context.Context, build uuid.UUID, before int64) ([]ProvisionerJobLog, error) {
return c.provisionerJobLogsBefore(ctx, fmt.Sprintf("/api/v2/workspacebuilds/%s/logs", build), before)
}
// WorkspaceBuildLogsAfter streams logs for a workspace build that occurred after a specific time.
func (c *Client) WorkspaceBuildLogsAfter(ctx context.Context, build uuid.UUID, after time.Time) (<-chan ProvisionerJobLog, io.Closer, error) {
// WorkspaceBuildLogsAfter streams logs for a workspace build that occurred after a specific log ID.
func (c *Client) WorkspaceBuildLogsAfter(ctx context.Context, build uuid.UUID, after int64) (<-chan ProvisionerJobLog, io.Closer, error) {
return c.provisionerJobLogsAfter(ctx, fmt.Sprintf("/api/v2/workspacebuilds/%s/logs", build), after)
}

View File

@ -41,14 +41,13 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error {
req.Name = "test-" + randName
}
after := time.Now()
workspace, err := r.client.CreateWorkspace(ctx, r.cfg.OrganizationID, r.cfg.UserID, req)
if err != nil {
return xerrors.Errorf("create workspace: %w", err)
}
r.workspaceID = workspace.ID
err = waitForBuild(ctx, logs, r.client, workspace.LatestBuild.ID, after)
err = waitForBuild(ctx, logs, r.client, workspace.LatestBuild.ID)
if err != nil {
return xerrors.Errorf("wait for build: %w", err)
}
@ -68,7 +67,6 @@ func (r *Runner) Cleanup(ctx context.Context, _ string) error {
return nil
}
after := time.Now()
build, err := r.client.CreateWorkspaceBuild(ctx, r.workspaceID, codersdk.CreateWorkspaceBuildRequest{
Transition: codersdk.WorkspaceTransitionDelete,
})
@ -78,7 +76,7 @@ func (r *Runner) Cleanup(ctx context.Context, _ string) error {
// TODO: capture these logs
logs := io.Discard
err = waitForBuild(ctx, logs, r.client, build.ID, after)
err = waitForBuild(ctx, logs, r.client, build.ID)
if err != nil {
return xerrors.Errorf("wait for build: %w", err)
}
@ -86,7 +84,7 @@ func (r *Runner) Cleanup(ctx context.Context, _ string) error {
return nil
}
func waitForBuild(ctx context.Context, w io.Writer, client *codersdk.Client, buildID uuid.UUID, after time.Time) error {
func waitForBuild(ctx context.Context, w io.Writer, client *codersdk.Client, buildID uuid.UUID) error {
_, _ = fmt.Fprint(w, "Build is currently queued...")
// Wait for build to start.
@ -106,7 +104,7 @@ func waitForBuild(ctx context.Context, w io.Writer, client *codersdk.Client, bui
_, _ = fmt.Fprintln(w, "\nBuild started! Streaming logs below:")
logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, buildID, after)
logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, buildID, 0)
if err != nil {
return xerrors.Errorf("start streaming build logs: %w", err)
}

View File

@ -50,6 +50,7 @@ type Options struct {
ForceCancelInterval time.Duration
UpdateInterval time.Duration
LogDebounceInterval time.Duration
PollInterval time.Duration
Provisioners Provisioners
WorkDirectory string
@ -66,6 +67,9 @@ func New(clientDialer Dialer, opts *Options) *Server {
if opts.ForceCancelInterval == 0 {
opts.ForceCancelInterval = time.Minute
}
if opts.LogDebounceInterval == 0 {
opts.LogDebounceInterval = 50 * time.Millisecond
}
if opts.Filesystem == nil {
opts.Filesystem = afero.NewOsFs()
}
@ -315,7 +319,7 @@ func (p *Server) acquireJob(ctx context.Context) {
return
}
p.activeJob = runner.NewRunner(
p.activeJob = runner.New(
ctx,
job,
p,
@ -325,6 +329,7 @@ func (p *Server) acquireJob(ctx context.Context) {
provisioner,
p.opts.UpdateInterval,
p.opts.ForceCancelInterval,
p.opts.LogDebounceInterval,
p.tracer,
p.opts.Metrics.Runner,
)

View File

@ -558,11 +558,17 @@ func TestProvisionerd(t *testing.T) {
}, nil
},
updateJob: func(ctx context.Context, update *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) {
if len(update.Logs) > 0 && update.Logs[0].Source == proto.LogSource_PROVISIONER {
// Close on a log so we know when the job is in progress!
updated.Do(func() {
close(updateChan)
})
if len(update.Logs) > 0 {
for _, log := range update.Logs {
if log.Source != proto.LogSource_PROVISIONER {
continue
}
// Close on a log so we know when the job is in progress!
updated.Do(func() {
close(updateChan)
})
break
}
}
return &proto.UpdateJobResponse{}, nil
},
@ -634,11 +640,17 @@ func TestProvisionerd(t *testing.T) {
},
updateJob: func(ctx context.Context, update *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) {
resp := &proto.UpdateJobResponse{}
if len(update.Logs) > 0 && update.Logs[0].Source == proto.LogSource_PROVISIONER {
// Close on a log so we know when the job is in progress!
updated.Do(func() {
close(updateChan)
})
if len(update.Logs) > 0 {
for _, log := range update.Logs {
if log.Source != proto.LogSource_PROVISIONER {
continue
}
// Close on a log so we know when the job is in progress!
updated.Do(func() {
close(updateChan)
})
break
}
}
// start returning Canceled once we've gotten at least one log.
select {

View File

@ -33,6 +33,10 @@ const (
MissingParameterErrorText = "missing parameter"
)
var (
errUpdateSkipped = xerrors.New("update skipped; job complete or failed")
)
type Runner struct {
tracer trace.Tracer
metrics Metrics
@ -44,6 +48,7 @@ type Runner struct {
provisioner sdkproto.DRPCProvisionerClient
updateInterval time.Duration
forceCancelInterval time.Duration
logBufferInterval time.Duration
// closed when the Runner is finished sending any updates/failed/complete.
done chan struct{}
@ -57,9 +62,11 @@ type Runner struct {
// mutex controls access to all the following variables.
mutex *sync.Mutex
// used to wait for the failedJob or completedJob to be populated
cond *sync.Cond
failedJob *proto.FailedJob
completedJob *proto.CompletedJob
cond *sync.Cond
flushLogsTimer *time.Timer
queuedLogs []*proto.Log
failedJob *proto.FailedJob
completedJob *proto.CompletedJob
// setting this false signals that no more messages about this job should be sent. Usually this
// means that a terminal message like FailedJob or CompletedJob has been sent, even in the case
// of a Cancel(). However, when someone calls Fail() or ForceStop(), we might not send the
@ -79,7 +86,7 @@ type JobUpdater interface {
CompleteJob(ctx context.Context, in *proto.CompletedJob) error
}
func NewRunner(
func New(
ctx context.Context,
job *proto.AcquiredJob,
updater JobUpdater,
@ -89,6 +96,7 @@ func NewRunner(
provisioner sdkproto.DRPCProvisionerClient,
updateInterval time.Duration,
forceCancelInterval time.Duration,
logDebounceInterval time.Duration,
tracer trace.Tracer,
metrics Metrics,
) *Runner {
@ -109,6 +117,8 @@ func NewRunner(
provisioner: provisioner,
updateInterval: updateInterval,
forceCancelInterval: forceCancelInterval,
logBufferInterval: logDebounceInterval,
queuedLogs: make([]*proto.Log, 0),
mutex: m,
cond: sync.NewCond(m),
done: make(chan struct{}),
@ -262,7 +272,7 @@ func (r *Runner) update(ctx context.Context, u *proto.UpdateJobRequest) (*proto.
r.mutex.Lock()
defer r.mutex.Unlock()
if !r.okToSend {
return nil, xerrors.New("update skipped; job complete or failed")
return nil, errUpdateSkipped
}
return r.sender.UpdateJob(ctx, u)
}
@ -291,19 +301,12 @@ func (r *Runner) doCleanFinish(ctx context.Context) {
ctx, span := r.startTrace(ctx, tracing.FuncName())
defer span.End()
_, err := r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: []*proto.Log{{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_INFO,
Stage: "Cleaning Up",
CreatedAt: time.Now().UnixMilli(),
}},
r.queueLog(ctx, &proto.Log{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_INFO,
Stage: "Cleaning Up",
CreatedAt: time.Now().UnixMilli(),
})
if err != nil {
r.logger.Warn(ctx, "failed to log cleanup")
return
}
// Cleanup the work directory after execution.
for attempt := 0; attempt < 5; attempt++ {
@ -320,6 +323,8 @@ func (r *Runner) doCleanFinish(ctx context.Context) {
r.logger.Debug(ctx, "cleaned up work directory")
break
}
r.flushQueuedLogs(ctx)
}()
completedJob, failedJob = r.do(ctx)
@ -335,14 +340,11 @@ func (r *Runner) do(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob)
return nil, r.failedJobf("create work directory %q: %s", r.workDirectory, err)
}
_, err = r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: []*proto.Log{{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_INFO,
Stage: "Setting up",
CreatedAt: time.Now().UnixMilli(),
}},
r.queueLog(ctx, &proto.Log{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_INFO,
Stage: "Setting up",
CreatedAt: time.Now().UnixMilli(),
})
if err != nil {
return nil, r.failedJobf("write log: %s", err)
@ -402,7 +404,6 @@ func (r *Runner) do(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob)
)
}
}
switch jobType := r.job.Type.(type) {
case *proto.AcquiredJob_TemplateImport_:
r.logger.Debug(context.Background(), "acquired job is template import")
@ -489,19 +490,12 @@ func (r *Runner) runReadmeParse(ctx context.Context) *proto.FailedJob {
fi, err := afero.ReadFile(r.filesystem, path.Join(r.workDirectory, ReadmeFile))
if err != nil {
_, err := r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: []*proto.Log{{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_DEBUG,
Stage: "No README.md provided",
CreatedAt: time.Now().UnixMilli(),
}},
r.queueLog(ctx, &proto.Log{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_DEBUG,
Stage: "No README.md provided",
CreatedAt: time.Now().UnixMilli(),
})
if err != nil {
return r.failedJobf("write log: %s", err)
}
return nil
}
@ -526,18 +520,12 @@ func (r *Runner) runTemplateImport(ctx context.Context) (*proto.CompletedJob, *p
defer span.End()
// Parse parameters and update the job with the parameter specs
_, err := r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: []*proto.Log{{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_INFO,
Stage: "Parsing template parameters",
CreatedAt: time.Now().UnixMilli(),
}},
r.queueLog(ctx, &proto.Log{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_INFO,
Stage: "Parsing template parameters",
CreatedAt: time.Now().UnixMilli(),
})
if err != nil {
return nil, r.failedJobf("write log: %s", err)
}
parameterSchemas, err := r.runTemplateImportParse(ctx)
if err != nil {
return nil, r.failedJobf("run parse: %s", err)
@ -562,18 +550,12 @@ func (r *Runner) runTemplateImport(ctx context.Context) (*proto.CompletedJob, *p
}
// Determine persistent resources
_, err = r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: []*proto.Log{{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_INFO,
Stage: "Detecting persistent resources",
CreatedAt: time.Now().UnixMilli(),
}},
r.queueLog(ctx, &proto.Log{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_INFO,
Stage: "Detecting persistent resources",
CreatedAt: time.Now().UnixMilli(),
})
if err != nil {
return nil, r.failedJobf("write log: %s", err)
}
startResources, err := r.runTemplateImportProvision(ctx, updateResponse.ParameterValues, &sdkproto.Provision_Metadata{
CoderUrl: r.job.GetTemplateImport().Metadata.CoderUrl,
WorkspaceTransition: sdkproto.WorkspaceTransition_START,
@ -583,18 +565,12 @@ func (r *Runner) runTemplateImport(ctx context.Context) (*proto.CompletedJob, *p
}
// Determine ephemeral resources.
_, err = r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: []*proto.Log{{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_INFO,
Stage: "Detecting ephemeral resources",
CreatedAt: time.Now().UnixMilli(),
}},
r.queueLog(ctx, &proto.Log{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_INFO,
Stage: "Detecting ephemeral resources",
CreatedAt: time.Now().UnixMilli(),
})
if err != nil {
return nil, r.failedJobf("write log: %s", err)
}
stopResources, err := r.runTemplateImportProvision(ctx, updateResponse.ParameterValues, &sdkproto.Provision_Metadata{
CoderUrl: r.job.GetTemplateImport().Metadata.CoderUrl,
WorkspaceTransition: sdkproto.WorkspaceTransition_STOP,
@ -638,19 +614,13 @@ func (r *Runner) runTemplateImportParse(ctx context.Context) ([]*sdkproto.Parame
slog.F("output", msgType.Log.Output),
)
_, err = r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: []*proto.Log{{
Source: proto.LogSource_PROVISIONER,
Level: msgType.Log.Level,
CreatedAt: time.Now().UnixMilli(),
Output: msgType.Log.Output,
Stage: "Parse parameters",
}},
r.queueLog(ctx, &proto.Log{
Source: proto.LogSource_PROVISIONER,
Level: msgType.Log.Level,
CreatedAt: time.Now().UnixMilli(),
Output: msgType.Log.Output,
Stage: "Parse parameters",
})
if err != nil {
return nil, xerrors.Errorf("update job: %w", err)
}
case *sdkproto.Parse_Response_Complete:
r.logger.Info(context.Background(), "parse complete",
slog.F("parameter_schemas", msgType.Complete.ParameterSchemas))
@ -721,19 +691,13 @@ func (r *Runner) runTemplateImportProvision(ctx context.Context, values []*sdkpr
slog.F("level", msgType.Log.Level),
slog.F("output", msgType.Log.Output),
)
_, err = r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: []*proto.Log{{
Source: proto.LogSource_PROVISIONER,
Level: msgType.Log.Level,
CreatedAt: time.Now().UnixMilli(),
Output: msgType.Log.Output,
Stage: stage,
}},
r.queueLog(ctx, &proto.Log{
Source: proto.LogSource_PROVISIONER,
Level: msgType.Log.Level,
CreatedAt: time.Now().UnixMilli(),
Output: msgType.Log.Output,
Stage: stage,
})
if err != nil {
return nil, xerrors.Errorf("send job update: %w", err)
}
case *sdkproto.Provision_Response_Complete:
if msgType.Complete.Error != "" {
r.logger.Info(context.Background(), "dry-run provision failure",
@ -822,18 +786,12 @@ func (r *Runner) runWorkspaceBuild(ctx context.Context) (*proto.CompletedJob, *p
stage = "Destroying workspace"
}
_, err := r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: []*proto.Log{{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_INFO,
Stage: stage,
CreatedAt: time.Now().UnixMilli(),
}},
r.queueLog(ctx, &proto.Log{
Source: proto.LogSource_PROVISIONER_DAEMON,
Level: sdkproto.LogLevel_INFO,
Stage: stage,
CreatedAt: time.Now().UnixMilli(),
})
if err != nil {
return nil, r.failedJobf("write log: %s", err)
}
// use the notStopped so that if we attempt to gracefully cancel, the stream will still be available for us
// to send the cancel to the provisioner
@ -881,19 +839,13 @@ func (r *Runner) runWorkspaceBuild(ctx context.Context) (*proto.CompletedJob, *p
slog.F("workspace_build_id", r.job.GetWorkspaceBuild().WorkspaceBuildId),
)
_, err = r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: []*proto.Log{{
Source: proto.LogSource_PROVISIONER,
Level: msgType.Log.Level,
CreatedAt: time.Now().UnixMilli(),
Output: msgType.Log.Output,
Stage: stage,
}},
r.queueLog(ctx, &proto.Log{
Source: proto.LogSource_PROVISIONER,
Level: msgType.Log.Level,
CreatedAt: time.Now().UnixMilli(),
Output: msgType.Log.Output,
Stage: stage,
})
if err != nil {
return nil, r.failedJobf("send job update: %s", err)
}
case *sdkproto.Provision_Response_Complete:
if msgType.Complete.Error != "" {
r.logger.Info(context.Background(), "provision failed; updating state",
@ -945,3 +897,41 @@ func (r *Runner) startTrace(ctx context.Context, name string, opts ...trace.Span
semconv.ServiceNameKey.String("coderd.provisionerd"),
))...)
}
func (r *Runner) queueLog(ctx context.Context, log *proto.Log) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.queuedLogs = append(r.queuedLogs, log)
if r.flushLogsTimer != nil {
r.flushLogsTimer.Reset(r.logBufferInterval)
return
}
if len(r.queuedLogs) > 100 {
// Flushing logs requires a lock, so this can happen async.
go r.flushQueuedLogs(ctx)
return
}
r.flushLogsTimer = time.AfterFunc(r.logBufferInterval, func() {
r.flushQueuedLogs(ctx)
})
}
func (r *Runner) flushQueuedLogs(ctx context.Context) {
r.mutex.Lock()
if r.flushLogsTimer != nil {
r.flushLogsTimer.Stop()
}
logs := r.queuedLogs[:]
r.queuedLogs = make([]*proto.Log, 0)
r.mutex.Unlock()
_, err := r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
Logs: logs,
})
if err != nil {
if errors.Is(err, errUpdateSkipped) {
return
}
r.logger.Error(ctx, "flush queued logs", slog.Error(err))
}
}

View File

@ -538,7 +538,7 @@ export interface ProvisionerJob {
// From codersdk/provisionerdaemons.go
export interface ProvisionerJobLog {
readonly id: string
readonly id: number
readonly created_at: string
readonly log_source: LogSource
readonly log_level: LogLevel

View File

@ -5,7 +5,7 @@ describe("groupLogsByStage", () => {
it("should group them by stage", () => {
const input: ProvisionerJobLog[] = [
{
id: "1",
id: 1,
created_at: "oct 13",
log_source: "provisioner",
log_level: "debug",
@ -13,7 +13,7 @@ describe("groupLogsByStage", () => {
output: "test",
},
{
id: "2",
id: 2,
created_at: "oct 13",
log_source: "provisioner",
log_level: "debug",
@ -21,7 +21,7 @@ describe("groupLogsByStage", () => {
output: "test",
},
{
id: "3",
id: 3,
created_at: "oct 13",
log_source: "provisioner",
log_level: "debug",

View File

@ -543,7 +543,7 @@ export const MockGitSSHKey: TypesGen.GitSSHKey = {
export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
{
id: "836f8ab6-5202-4711-afa5-293394ced011",
id: 1,
created_at: "2022-05-19T16:45:31.005Z",
log_source: "provisioner_daemon",
log_level: "info",
@ -551,7 +551,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "",
},
{
id: "2db0ae92-b310-4a6e-8b1f-23380b70ac7f",
id: 2,
created_at: "2022-05-19T16:45:31.006Z",
log_source: "provisioner_daemon",
log_level: "info",
@ -559,7 +559,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "",
},
{
id: "37a5b7b1-b3eb-47cf-b80b-bd16e2e08a3d",
id: 3,
created_at: "2022-05-19T16:45:31.072Z",
log_source: "provisioner",
log_level: "debug",
@ -567,7 +567,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "",
},
{
id: "5e4e37a1-c217-48bc-84f5-7f1c3efbd042",
id: 4,
created_at: "2022-05-19T16:45:31.073Z",
log_source: "provisioner",
log_level: "debug",
@ -575,7 +575,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "Initializing the backend...",
},
{
id: "060ed132-5d12-4584-9005-5c9557febe2f",
id: 5,
created_at: "2022-05-19T16:45:31.077Z",
log_source: "provisioner",
log_level: "debug",
@ -583,7 +583,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "",
},
{
id: "b2e70a1c-1943-4616-8ac9-25326c9f7e7b",
id: 6,
created_at: "2022-05-19T16:45:31.078Z",
log_source: "provisioner",
log_level: "debug",
@ -591,7 +591,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "Initializing provider plugins...",
},
{
id: "993107fe-6dfb-42ec-912a-b32f50e60d62",
id: 7,
created_at: "2022-05-19T16:45:31.078Z",
log_source: "provisioner",
log_level: "debug",
@ -599,7 +599,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: '- Finding hashicorp/google versions matching "~\u003e 4.15"...',
},
{
id: "2ad2e2a1-7a75-4827-8cb9-928acfc6fc07",
id: 8,
created_at: "2022-05-19T16:45:31.123Z",
log_source: "provisioner",
log_level: "debug",
@ -607,7 +607,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: '- Finding coder/coder versions matching "0.3.4"...',
},
{
id: "7c723a90-0190-4c2f-9d97-ede39ef3d55f",
id: 9,
created_at: "2022-05-19T16:45:31.137Z",
log_source: "provisioner",
log_level: "debug",
@ -615,7 +615,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "- Using hashicorp/google v4.21.0 from the shared cache directory",
},
{
id: "3910144b-411b-4a53-9900-88d406ed9bf4",
id: 10,
created_at: "2022-05-19T16:45:31.344Z",
log_source: "provisioner",
log_level: "debug",
@ -623,7 +623,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "- Using coder/coder v0.3.4 from the shared cache directory",
},
{
id: "e3a02ad4-edc0-442f-8b9a-39d01d56b43b",
id: 11,
created_at: "2022-05-19T16:45:31.388Z",
log_source: "provisioner",
log_level: "debug",
@ -631,7 +631,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "",
},
{
id: "440cceb3-aabf-4838-979b-1fd37fe2d8d8",
id: 12,
created_at: "2022-05-19T16:45:31.388Z",
log_source: "provisioner",
log_level: "debug",
@ -640,7 +640,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
"Terraform has created a lock file .terraform.lock.hcl to record the provider",
},
{
id: "90e1f244-78ff-4d95-871e-b2bebcabc39a",
id: 13,
created_at: "2022-05-19T16:45:31.389Z",
log_source: "provisioner",
log_level: "debug",
@ -649,7 +649,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
"selections it made above. Include this file in your version control repository",
},
{
id: "e4527d6c-2412-452b-a946-5870787caf6b",
id: 14,
created_at: "2022-05-19T16:45:31.389Z",
log_source: "provisioner",
log_level: "debug",
@ -658,7 +658,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
"so that Terraform can guarantee to make the same selections by default when",
},
{
id: "02f96d19-d94b-4d0e-a1c4-313a0d2ff9e3",
id: 15,
created_at: "2022-05-19T16:45:31.39Z",
log_source: "provisioner",
log_level: "debug",
@ -666,7 +666,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: 'you run "terraform init" in the future.',
},
{
id: "667c03ca-1b24-4f36-a598-f0322cf3e2a1",
id: 16,
created_at: "2022-05-19T16:45:31.39Z",
log_source: "provisioner",
log_level: "debug",
@ -674,7 +674,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "",
},
{
id: "48039d6a-9b21-460f-9ca3-4b0e2becfd18",
id: 17,
created_at: "2022-05-19T16:45:31.391Z",
log_source: "provisioner",
log_level: "debug",
@ -682,7 +682,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "Terraform has been successfully initialized!",
},
{
id: "6fe4b64f-3aa6-4850-96e9-6db8478a53be",
id: 18,
created_at: "2022-05-19T16:45:31.42Z",
log_source: "provisioner",
log_level: "info",
@ -690,7 +690,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "Terraform 1.1.9",
},
{
id: "fa7b6321-7ecd-492d-a671-6366186fad08",
id: 19,
created_at: "2022-05-19T16:45:33.537Z",
log_source: "provisioner",
log_level: "info",
@ -698,7 +698,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "coder_agent.dev: Plan to create",
},
{
id: "e677e49f-c5ba-417c-8c9d-78bdad744ce1",
id: 20,
created_at: "2022-05-19T16:45:33.537Z",
log_source: "provisioner",
log_level: "info",
@ -706,7 +706,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "google_compute_disk.root: Plan to create",
},
{
id: "4b0e6168-29e4-4419-bf81-b57e31087666",
id: 21,
created_at: "2022-05-19T16:45:33.538Z",
log_source: "provisioner",
log_level: "info",
@ -714,7 +714,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "google_compute_instance.dev[0]: Plan to create",
},
{
id: "5902f89c-8acd-45e2-9bd6-de4d6fd8fc9c",
id: 22,
created_at: "2022-05-19T16:45:33.539Z",
log_source: "provisioner",
log_level: "info",
@ -722,7 +722,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "Plan: 3 to add, 0 to change, 0 to destroy.",
},
{
id: "a8107907-7c53-4aae-bb48-9a5f9759c7d5",
id: 23,
created_at: "2022-05-19T16:45:33.712Z",
log_source: "provisioner",
log_level: "info",
@ -730,7 +730,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "coder_agent.dev: Creating...",
},
{
id: "aaf13503-2f1a-4f6c-aced-b8fc48304dc1",
id: 24,
created_at: "2022-05-19T16:45:33.719Z",
log_source: "provisioner",
log_level: "info",
@ -739,7 +739,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
"coder_agent.dev: Creation complete after 0s [id=d07f5bdc-4a8d-4919-9cdb-0ac6ba9e64d6]",
},
{
id: "4ada8886-f5b3-4fee-a1a3-72064b50d5ae",
id: 25,
created_at: "2022-05-19T16:45:34.139Z",
log_source: "provisioner",
log_level: "info",
@ -747,7 +747,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "google_compute_disk.root: Creating...",
},
{
id: "8ffc59e8-a4d0-4ffe-9bcc-cb84ca51cc22",
id: 26,
created_at: "2022-05-19T16:45:44.14Z",
log_source: "provisioner",
log_level: "info",
@ -755,7 +755,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "google_compute_disk.root: Still creating... [10s elapsed]",
},
{
id: "063189fd-75ad-415a-ac77-8c34b9e202b2",
id: 27,
created_at: "2022-05-19T16:45:47.106Z",
log_source: "provisioner",
log_level: "info",
@ -764,7 +764,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
"google_compute_disk.root: Creation complete after 13s [id=projects/bruno-coder-v2/zones/europe-west4-b/disks/coder-developer-bruno-dev-123-root]",
},
{
id: "6fd554a1-a7a2-439f-b8d8-369d6c1ead21",
id: 28,
created_at: "2022-05-19T16:45:47.118Z",
log_source: "provisioner",
log_level: "info",
@ -772,7 +772,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "google_compute_instance.dev[0]: Creating...",
},
{
id: "87388f7e-ab01-44b1-b35e-8e06636164d3",
id: 29,
created_at: "2022-05-19T16:45:57.122Z",
log_source: "provisioner",
log_level: "info",
@ -780,7 +780,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "google_compute_instance.dev[0]: Still creating... [10s elapsed]",
},
{
id: "baa40120-3f18-40d2-a35c-b11f421a1ce1",
id: 30,
created_at: "2022-05-19T16:46:00.837Z",
log_source: "provisioner",
log_level: "info",
@ -789,7 +789,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
"google_compute_instance.dev[0]: Creation complete after 14s [id=projects/bruno-coder-v2/zones/europe-west4-b/instances/coder-developer-bruno-dev-123]",
},
{
id: "00e18953-fba6-4b43-97a3-ecf376553c08",
id: 31,
created_at: "2022-05-19T16:46:00.846Z",
log_source: "provisioner",
log_level: "info",
@ -797,7 +797,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "Apply complete! Resources: 3 added, 0 changed, 0 destroyed.",
},
{
id: "431811da-b534-4d92-b6e5-44814548c812",
id: 32,
created_at: "2022-05-19T16:46:00.847Z",
log_source: "provisioner",
log_level: "info",
@ -805,7 +805,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [
output: "Outputs: 0",
},
{
id: "70459334-4878-4bda-a546-98eee166c4c6",
id: 33,
created_at: "2022-05-19T16:46:02.283Z",
log_source: "provisioner_daemon",
log_level: "info",

View File

@ -125,11 +125,14 @@ export const workspaceBuildMachine = createMachine(
API.getWorkspaceBuildLogs(ctx.buildId, ctx.timeCursor),
streamWorkspaceBuildLogs: (ctx) => async (callback) => {
return new Promise<void>((resolve, reject) => {
if (!ctx.logs) {
return reject("logs must be set")
}
const proto = location.protocol === "https:" ? "wss:" : "ws:"
const socket = new WebSocket(
`${proto}//${location.host}/api/v2/workspacebuilds/${
ctx.buildId
}/logs?follow=true&after=${ctx.timeCursor.getTime()}`,
}/logs?follow=true&after=${ctx.logs[ctx.logs.length - 1].id}`,
)
socket.binaryType = "blob"
socket.addEventListener("message", (event) => {