mirror of https://github.com/coder/coder.git
fix: add some missing workspace updates (#7790)
* Standardize on function to get workspace channel name There were two, now there is one. * Add some missing workspace updates There are some failure cases where we do not set the type as a workspace build which causes the workspace update to never be published. * Make build failures warnings Otherwise the associated test fails due to the logger fataling on error messages.
This commit is contained in:
parent
970a829939
commit
7ed17b2605
|
@ -643,10 +643,14 @@ func CreateWorkspaceBuild(
|
|||
client *codersdk.Client,
|
||||
workspace codersdk.Workspace,
|
||||
transition database.WorkspaceTransition,
|
||||
mutators ...func(*codersdk.CreateWorkspaceBuildRequest),
|
||||
) codersdk.WorkspaceBuild {
|
||||
req := codersdk.CreateWorkspaceBuildRequest{
|
||||
Transition: codersdk.WorkspaceTransition(transition),
|
||||
}
|
||||
for _, mut := range mutators {
|
||||
mut(&req)
|
||||
}
|
||||
build, err := client.CreateWorkspaceBuild(context.Background(), workspace.ID, req)
|
||||
require.NoError(t, err)
|
||||
return build
|
||||
|
|
|
@ -632,9 +632,6 @@ func (server *Server) FailJob(ctx context.Context, failJob *proto.FailedJob) (*p
|
|||
|
||||
switch jobType := failJob.Type.(type) {
|
||||
case *proto.FailedJob_WorkspaceBuild_:
|
||||
if jobType.WorkspaceBuild.State == nil {
|
||||
break
|
||||
}
|
||||
var input WorkspaceProvisionJob
|
||||
err = json.Unmarshal(job.Input, &input)
|
||||
if err != nil {
|
||||
|
@ -642,21 +639,23 @@ func (server *Server) FailJob(ctx context.Context, failJob *proto.FailedJob) (*p
|
|||
}
|
||||
|
||||
var build database.WorkspaceBuild
|
||||
err := server.Database.InTx(func(db database.Store) error {
|
||||
workspaceBuild, err := db.GetWorkspaceBuildByID(ctx, input.WorkspaceBuildID)
|
||||
err = server.Database.InTx(func(db database.Store) error {
|
||||
build, err = db.GetWorkspaceBuildByID(ctx, input.WorkspaceBuildID)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("get workspace build: %w", err)
|
||||
}
|
||||
|
||||
build, err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
|
||||
ID: input.WorkspaceBuildID,
|
||||
UpdatedAt: database.Now(),
|
||||
ProvisionerState: jobType.WorkspaceBuild.State,
|
||||
Deadline: workspaceBuild.Deadline,
|
||||
MaxDeadline: workspaceBuild.MaxDeadline,
|
||||
})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("update workspace build state: %w", err)
|
||||
if jobType.WorkspaceBuild.State != nil {
|
||||
_, err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
|
||||
ID: input.WorkspaceBuildID,
|
||||
UpdatedAt: database.Now(),
|
||||
ProvisionerState: jobType.WorkspaceBuild.State,
|
||||
Deadline: build.Deadline,
|
||||
MaxDeadline: build.MaxDeadline,
|
||||
})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("update workspace build state: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -956,7 +956,7 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
|
|||
})
|
||||
}
|
||||
|
||||
cancelWorkspaceSubscribe, err := api.Pubsub.Subscribe(watchWorkspaceChannel(workspace.ID), sendUpdate)
|
||||
cancelWorkspaceSubscribe, err := api.Pubsub.Subscribe(codersdk.WorkspaceNotifyChannel(workspace.ID), sendUpdate)
|
||||
if err != nil {
|
||||
_ = sendEvent(ctx, codersdk.ServerSentEvent{
|
||||
Type: codersdk.ServerSentEventTypeError,
|
||||
|
@ -1243,12 +1243,8 @@ func validWorkspaceSchedule(s *string) (sql.NullString, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func watchWorkspaceChannel(id uuid.UUID) string {
|
||||
return fmt.Sprintf("workspace:%s", id)
|
||||
}
|
||||
|
||||
func (api *API) publishWorkspaceUpdate(ctx context.Context, workspaceID uuid.UUID) {
|
||||
err := api.Pubsub.Publish(watchWorkspaceChannel(workspaceID), []byte{})
|
||||
err := api.Pubsub.Publish(codersdk.WorkspaceNotifyChannel(workspaceID), []byte{})
|
||||
if err != nil {
|
||||
api.Logger.Warn(ctx, "failed to publish workspace update",
|
||||
slog.F("workspace_id", workspaceID), slog.Error(err))
|
||||
|
|
|
@ -2037,6 +2037,7 @@ func TestWorkspaceExtend(t *testing.T) {
|
|||
func TestWorkspaceWatcher(t *testing.T) {
|
||||
t.Parallel()
|
||||
client, closeFunc := coderdtest.NewWithProvisionerCloser(t, &coderdtest.Options{IncludeProvisionerDaemon: true})
|
||||
defer closeFunc.Close()
|
||||
user := coderdtest.CreateFirstUser(t, client)
|
||||
authToken := uuid.NewString()
|
||||
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
|
||||
|
@ -2120,7 +2121,6 @@ func TestWorkspaceWatcher(t *testing.T) {
|
|||
return w.LatestBuild.Resources[0].Agents[0].Status == codersdk.WorkspaceAgentDisconnected
|
||||
})
|
||||
|
||||
closeFunc.Close()
|
||||
build := coderdtest.CreateWorkspaceBuild(t, client, workspace, database.WorkspaceTransitionStart)
|
||||
wait("first is for the workspace build itself", nil)
|
||||
err = client.CancelWorkspaceBuild(ctx, build.ID)
|
||||
|
@ -2133,13 +2133,37 @@ func TestWorkspaceWatcher(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
wait("update workspace name", nil)
|
||||
|
||||
// Add a new version that will fail.
|
||||
updatedVersion := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
|
||||
Parse: echo.ParseComplete,
|
||||
ProvisionPlan: echo.ProvisionComplete,
|
||||
ProvisionApply: []*proto.Provision_Response{{
|
||||
Type: &proto.Provision_Response_Complete{
|
||||
Complete: &proto.Provision_Complete{
|
||||
Error: "test error",
|
||||
},
|
||||
},
|
||||
}},
|
||||
}, func(req *codersdk.CreateTemplateVersionRequest) {
|
||||
req.TemplateID = template.ID
|
||||
})
|
||||
coderdtest.AwaitTemplateVersionJob(t, client, updatedVersion.ID)
|
||||
err = client.UpdateActiveTemplateVersion(ctx, template.ID, codersdk.UpdateActiveTemplateVersion{
|
||||
ID: template.ActiveVersionID,
|
||||
ID: updatedVersion.ID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
wait("update active template version", nil)
|
||||
|
||||
cancel()
|
||||
// Build with the new template; should end up with a failure state.
|
||||
_ = coderdtest.CreateWorkspaceBuild(t, client, workspace, database.WorkspaceTransitionStart, func(req *codersdk.CreateWorkspaceBuildRequest) {
|
||||
req.TemplateVersionID = updatedVersion.ID
|
||||
})
|
||||
wait("workspace build pending", func(w codersdk.Workspace) bool {
|
||||
return w.LatestBuild.Status == codersdk.WorkspaceStatusPending
|
||||
})
|
||||
wait("workspace build failed", func(w codersdk.Workspace) bool {
|
||||
return w.LatestBuild.Status == codersdk.WorkspaceStatusFailed
|
||||
})
|
||||
}
|
||||
|
||||
func mustLocation(t *testing.T, location string) *time.Location {
|
||||
|
|
|
@ -891,7 +891,7 @@ func (r *Runner) buildWorkspace(ctx context.Context, stage string, req *sdkproto
|
|||
// will still be available for us to send the cancel to the provisioner
|
||||
stream, err := r.provisioner.Provision(ctx)
|
||||
if err != nil {
|
||||
return nil, r.failedJobf("provision: %s", err)
|
||||
return nil, r.failedWorkspaceBuildf("provision: %s", err)
|
||||
}
|
||||
defer stream.Close()
|
||||
go func() {
|
||||
|
@ -909,13 +909,13 @@ func (r *Runner) buildWorkspace(ctx context.Context, stage string, req *sdkproto
|
|||
|
||||
err = stream.Send(req)
|
||||
if err != nil {
|
||||
return nil, r.failedJobf("start provision: %s", err)
|
||||
return nil, r.failedWorkspaceBuildf("start provision: %s", err)
|
||||
}
|
||||
|
||||
for {
|
||||
msg, err := stream.Recv()
|
||||
if err != nil {
|
||||
return nil, r.failedJobf("recv workspace provision: %s", err)
|
||||
return nil, r.failedWorkspaceBuildf("recv workspace provision: %s", err)
|
||||
}
|
||||
switch msgType := msg.Type.(type) {
|
||||
case *sdkproto.Provision_Response_Log:
|
||||
|
@ -934,7 +934,7 @@ func (r *Runner) buildWorkspace(ctx context.Context, stage string, req *sdkproto
|
|||
})
|
||||
case *sdkproto.Provision_Response_Complete:
|
||||
if msgType.Complete.Error != "" {
|
||||
r.logger.Error(context.Background(), "provision failed; updating state",
|
||||
r.logger.Warn(context.Background(), "provision failed; updating state",
|
||||
slog.F("state_length", len(msgType.Complete.State)),
|
||||
slog.F("error", msgType.Complete.Error),
|
||||
)
|
||||
|
@ -958,7 +958,7 @@ func (r *Runner) buildWorkspace(ctx context.Context, stage string, req *sdkproto
|
|||
// Stop looping!
|
||||
return msgType.Complete, nil
|
||||
default:
|
||||
return nil, r.failedJobf("invalid message type %T received from provisioner", msg.Type)
|
||||
return nil, r.failedWorkspaceBuildf("invalid message type %T received from provisioner", msg.Type)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1092,6 +1092,12 @@ func (r *Runner) runWorkspaceBuild(ctx context.Context) (*proto.CompletedJob, *p
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (r *Runner) failedWorkspaceBuildf(format string, args ...interface{}) *proto.FailedJob {
|
||||
failedJob := r.failedJobf(format, args...)
|
||||
failedJob.Type = &proto.FailedJob_WorkspaceBuild_{}
|
||||
return failedJob
|
||||
}
|
||||
|
||||
func (r *Runner) failedJobf(format string, args ...interface{}) *proto.FailedJob {
|
||||
message := fmt.Sprintf(format, args...)
|
||||
var code string
|
||||
|
|
Loading…
Reference in New Issue