Filter query: has-agent connecting, connected, disconnected, timeout (#5145)

* WIP

* has-agent:connecting, connected

* Fix

* Fix

* has-agent:disconnected, timeout

* Fix: typo

* Fix

* TODOs

* databasefake

* Fix: typo

* More TODOs

* databasefake

* Timeout tests

* Address PR comments

* Implement FIXMEs

* Renamings

* Address PR comments

* Fix: readability

* Fix: refactor CASE logic

* CASE logic

* Fix

* Use CTE

* Polishing

* Comment

* WIP

* IS NOT NULL

* Without CTE

* One more optimization

* 2nd optimization
This commit is contained in:
Marcin Tojek 2022-11-24 15:33:13 +01:00 committed by GitHub
parent 511bb469c4
commit 25da224513
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 336 additions and 27 deletions

View File

@ -871,6 +871,44 @@ func (q *fakeQuerier) GetAuthorizedWorkspaces(ctx context.Context, arg database.
}
}
if arg.HasAgent != "" {
build, err := q.GetLatestWorkspaceBuildByWorkspaceID(ctx, workspace.ID)
if err != nil {
return nil, xerrors.Errorf("get latest build: %w", err)
}
job, err := q.GetProvisionerJobByID(ctx, build.JobID)
if err != nil {
return nil, xerrors.Errorf("get provisioner job: %w", err)
}
workspaceResources, err := q.GetWorkspaceResourcesByJobID(ctx, job.ID)
if err != nil {
return nil, xerrors.Errorf("get workspace resources: %w", err)
}
var workspaceResourceIDs []uuid.UUID
for _, wr := range workspaceResources {
workspaceResourceIDs = append(workspaceResourceIDs, wr.ID)
}
workspaceAgents, err := q.GetWorkspaceAgentsByResourceIDs(ctx, workspaceResourceIDs)
if err != nil {
return nil, xerrors.Errorf("get workspace agents: %w", err)
}
var hasAgentMatched bool
for _, wa := range workspaceAgents {
if mapAgentStatus(wa, arg.AgentInactiveDisconnectTimeoutSeconds) == arg.HasAgent {
hasAgentMatched = true
}
}
if !hasAgentMatched {
continue
}
}
if len(arg.TemplateIds) > 0 {
match := false
for _, id := range arg.TemplateIds {
@ -909,6 +947,40 @@ func (q *fakeQuerier) GetAuthorizedWorkspaces(ctx context.Context, arg database.
return convertToWorkspaceRows(workspaces, int64(beforePageCount)), nil
}
// mapAgentStatus determines the agent status based on different timestamps like created_at, last_connected_at, disconnected_at, etc.
// The function must be in sync with: coderd/workspaceagents.go:convertWorkspaceAgent.
func mapAgentStatus(dbAgent database.WorkspaceAgent, agentInactiveDisconnectTimeoutSeconds int64) string {
var status string
connectionTimeout := time.Duration(dbAgent.ConnectionTimeoutSeconds) * time.Second
switch {
case !dbAgent.FirstConnectedAt.Valid:
switch {
case connectionTimeout > 0 && database.Now().Sub(dbAgent.CreatedAt) > connectionTimeout:
// If the agent took too long to connect the first time,
// mark it as timed out.
status = "timeout"
default:
// If the agent never connected, it's waiting for the compute
// to start up.
status = "connecting"
}
case dbAgent.DisconnectedAt.Time.After(dbAgent.LastConnectedAt.Time):
// If we've disconnected after our last connection, we know the
// agent is no longer connected.
status = "disconnected"
case database.Now().Sub(dbAgent.LastConnectedAt.Time) > time.Duration(agentInactiveDisconnectTimeoutSeconds)*time.Second:
// The connection died without updating the last connected.
status = "disconnected"
case dbAgent.LastConnectedAt.Valid:
// The agent should be assumed connected if it's under inactivity timeouts
// and last connected at has been properly set.
status = "connected"
default:
panic("unknown agent status: " + status)
}
return status
}
func convertToWorkspaceRows(workspaces []database.Workspace, count int64) []database.GetWorkspacesRow {
rows := make([]database.GetWorkspacesRow, len(workspaces))
for i, w := range workspaces {

View File

@ -132,6 +132,8 @@ func (q *sqlQuerier) GetAuthorizedWorkspaces(ctx context.Context, arg GetWorkspa
arg.TemplateName,
pq.Array(arg.TemplateIds),
arg.Name,
arg.HasAgent,
arg.AgentInactiveDisconnectTimeoutSeconds,
arg.Offset,
arg.Limit,
)

View File

@ -6272,6 +6272,7 @@ FROM
LEFT JOIN LATERAL (
SELECT
workspace_builds.transition,
provisioner_jobs.id AS provisioner_job_id,
provisioner_jobs.started_at,
provisioner_jobs.updated_at,
provisioner_jobs.canceled_at,
@ -6373,7 +6374,7 @@ WHERE
-- Use the organization filter to restrict to 1 org if needed.
AND CASE
WHEN $5 :: text != '' THEN
template_id = ANY(SELECT id FROM templates WHERE lower(name) = lower($5) AND deleted = false)
template_id = ANY(SELECT id FROM templates WHERE lower(name) = lower($5) AND deleted = false)
ELSE true
END
-- Filter by template_ids
@ -6388,29 +6389,68 @@ WHERE
name ILIKE '%' || $7 || '%'
ELSE true
END
-- Filter by agent status
-- has-agent: is only applicable for workspaces in "start" transition. Stopped and deleted workspaces don't have agents.
AND CASE
WHEN $8 :: text != '' THEN
(
SELECT COUNT(*)
FROM
workspace_resources
JOIN
workspace_agents
ON
workspace_agents.resource_id = workspace_resources.id
WHERE
workspace_resources.job_id = latest_build.provisioner_job_id AND
latest_build.transition = 'start'::workspace_transition AND
$8 = (
CASE
WHEN workspace_agents.first_connected_at IS NULL THEN
CASE
WHEN workspace_agents.connection_timeout_seconds > 0 AND NOW() - workspace_agents.created_at > workspace_agents.connection_timeout_seconds * INTERVAL '1 second' THEN
'timeout'
ELSE
'connecting'
END
WHEN workspace_agents.disconnected_at > workspace_agents.last_connected_at THEN
'disconnected'
WHEN NOW() - workspace_agents.last_connected_at > INTERVAL '1 second' * $9 :: bigint THEN
'disconnected'
WHEN workspace_agents.last_connected_at IS NOT NULL THEN
'connected'
ELSE
NULL
END
)
) > 0
ELSE true
END
-- Authorize Filter clause will be injected below in GetAuthorizedWorkspaces
-- @authorize_filter
ORDER BY
last_used_at DESC
last_used_at DESC
LIMIT
CASE
WHEN $9 :: integer > 0 THEN
$9
END
CASE
WHEN $11 :: integer > 0 THEN
$11
END
OFFSET
$8
$10
`
type GetWorkspacesParams struct {
Deleted bool `db:"deleted" json:"deleted"`
Status string `db:"status" json:"status"`
OwnerID uuid.UUID `db:"owner_id" json:"owner_id"`
OwnerUsername string `db:"owner_username" json:"owner_username"`
TemplateName string `db:"template_name" json:"template_name"`
TemplateIds []uuid.UUID `db:"template_ids" json:"template_ids"`
Name string `db:"name" json:"name"`
Offset int32 `db:"offset_" json:"offset_"`
Limit int32 `db:"limit_" json:"limit_"`
Deleted bool `db:"deleted" json:"deleted"`
Status string `db:"status" json:"status"`
OwnerID uuid.UUID `db:"owner_id" json:"owner_id"`
OwnerUsername string `db:"owner_username" json:"owner_username"`
TemplateName string `db:"template_name" json:"template_name"`
TemplateIds []uuid.UUID `db:"template_ids" json:"template_ids"`
Name string `db:"name" json:"name"`
HasAgent string `db:"has_agent" json:"has_agent"`
AgentInactiveDisconnectTimeoutSeconds int64 `db:"agent_inactive_disconnect_timeout_seconds" json:"agent_inactive_disconnect_timeout_seconds"`
Offset int32 `db:"offset_" json:"offset_"`
Limit int32 `db:"limit_" json:"limit_"`
}
type GetWorkspacesRow struct {
@ -6437,6 +6477,8 @@ func (q *sqlQuerier) GetWorkspaces(ctx context.Context, arg GetWorkspacesParams)
arg.TemplateName,
pq.Array(arg.TemplateIds),
arg.Name,
arg.HasAgent,
arg.AgentInactiveDisconnectTimeoutSeconds,
arg.Offset,
arg.Limit,
)

View File

@ -45,6 +45,7 @@ FROM
LEFT JOIN LATERAL (
SELECT
workspace_builds.transition,
provisioner_jobs.id AS provisioner_job_id,
provisioner_jobs.started_at,
provisioner_jobs.updated_at,
provisioner_jobs.canceled_at,
@ -146,7 +147,7 @@ WHERE
-- Use the organization filter to restrict to 1 org if needed.
AND CASE
WHEN @template_name :: text != '' THEN
template_id = ANY(SELECT id FROM templates WHERE lower(name) = lower(@template_name) AND deleted = false)
template_id = ANY(SELECT id FROM templates WHERE lower(name) = lower(@template_name) AND deleted = false)
ELSE true
END
-- Filter by template_ids
@ -161,17 +162,54 @@ WHERE
name ILIKE '%' || @name || '%'
ELSE true
END
-- Filter by agent status
-- has-agent: is only applicable for workspaces in "start" transition. Stopped and deleted workspaces don't have agents.
AND CASE
WHEN @has_agent :: text != '' THEN
(
SELECT COUNT(*)
FROM
workspace_resources
JOIN
workspace_agents
ON
workspace_agents.resource_id = workspace_resources.id
WHERE
workspace_resources.job_id = latest_build.provisioner_job_id AND
latest_build.transition = 'start'::workspace_transition AND
@has_agent = (
CASE
WHEN workspace_agents.first_connected_at IS NULL THEN
CASE
WHEN workspace_agents.connection_timeout_seconds > 0 AND NOW() - workspace_agents.created_at > workspace_agents.connection_timeout_seconds * INTERVAL '1 second' THEN
'timeout'
ELSE
'connecting'
END
WHEN workspace_agents.disconnected_at > workspace_agents.last_connected_at THEN
'disconnected'
WHEN NOW() - workspace_agents.last_connected_at > INTERVAL '1 second' * @agent_inactive_disconnect_timeout_seconds :: bigint THEN
'disconnected'
WHEN workspace_agents.last_connected_at IS NOT NULL THEN
'connected'
ELSE
NULL
END
)
) > 0
ELSE true
END
-- Authorize Filter clause will be injected below in GetAuthorizedWorkspaces
-- @authorize_filter
ORDER BY
last_used_at DESC
last_used_at DESC
LIMIT
CASE
WHEN @limit_ :: integer > 0 THEN
@limit_
END
CASE
WHEN @limit_ :: integer > 0 THEN
@limit_
END
OFFSET
@offset_
@offset_
;
-- name: GetWorkspaceByOwnerIDAndName :one

View File

@ -104,7 +104,7 @@ func (api *API) workspaces(rw http.ResponseWriter, r *http.Request) {
}
queryStr := r.URL.Query().Get("q")
filter, errs := workspaceSearchQuery(queryStr, page)
filter, errs := workspaceSearchQuery(queryStr, page, api.AgentInactiveDisconnectTimeout)
if len(errs) > 0 {
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
Message: "Invalid workspace search query.",
@ -1091,8 +1091,10 @@ func validWorkspaceSchedule(s *string) (sql.NullString, error) {
// workspaceSearchQuery takes a query string and returns the workspace filter.
// It also can return the list of validation errors to return to the api.
func workspaceSearchQuery(query string, page codersdk.Pagination) (database.GetWorkspacesParams, []codersdk.ValidationError) {
func workspaceSearchQuery(query string, page codersdk.Pagination, agentInactiveDisconnectTimeout time.Duration) (database.GetWorkspacesParams, []codersdk.ValidationError) {
filter := database.GetWorkspacesParams{
AgentInactiveDisconnectTimeoutSeconds: int64(agentInactiveDisconnectTimeout.Seconds()),
Offset: int32(page.Offset),
Limit: int32(page.Limit),
}
@ -1139,7 +1141,7 @@ func workspaceSearchQuery(query string, page codersdk.Pagination) (database.GetW
filter.TemplateName = parser.String(searchParams, "", "template")
filter.Name = parser.String(searchParams, "", "name")
filter.Status = parser.String(searchParams, "", "status")
filter.HasAgent = parser.String(searchParams, "", "has-agent")
return filter, parser.Errors
}

View File

@ -4,6 +4,7 @@ import (
"fmt"
"strings"
"testing"
"time"
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/codersdk"
@ -136,7 +137,7 @@ func TestSearchWorkspace(t *testing.T) {
c := c
t.Run(c.Name, func(t *testing.T) {
t.Parallel()
values, errs := workspaceSearchQuery(c.Query, codersdk.Pagination{})
values, errs := workspaceSearchQuery(c.Query, codersdk.Pagination{}, 0)
if c.ExpectedErrorContains != "" {
require.True(t, len(errs) > 0, "expect some errors")
var s strings.Builder
@ -150,4 +151,13 @@ func TestSearchWorkspace(t *testing.T) {
}
})
}
t.Run("AgentInactiveDisconnectTimeout", func(t *testing.T) {
t.Parallel()
query := `foo:bar`
timeout := 1337 * time.Second
values, errs := workspaceSearchQuery(query, codersdk.Pagination{}, timeout)
require.Empty(t, errs)
require.Equal(t, int64(timeout.Seconds()), values.AgentInactiveDisconnectTimeoutSeconds)
})
}

View File

@ -824,6 +824,149 @@ func TestWorkspaceFilterManual(t *testing.T) {
require.Len(t, res.Workspaces, 1)
require.Equal(t, workspace.ID, res.Workspaces[0].ID)
})
t.Run("FilterQueryHasAgentConnecting", func(t *testing.T) {
t.Parallel()
client := coderdtest.New(t, &coderdtest.Options{
IncludeProvisionerDaemon: true,
})
user := coderdtest.CreateFirstUser(t, client)
authToken := uuid.NewString()
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
Parse: echo.ParseComplete,
ProvisionPlan: echo.ProvisionComplete,
ProvisionApply: []*proto.Provision_Response{{
Type: &proto.Provision_Response_Complete{
Complete: &proto.Provision_Complete{
Resources: []*proto.Resource{{
Name: "example",
Type: "aws_instance",
Agents: []*proto.Agent{{
Id: uuid.NewString(),
Auth: &proto.Agent_Token{
Token: authToken,
},
}},
}},
},
},
}},
})
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)
workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID)
coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
res, err := client.Workspaces(ctx, codersdk.WorkspaceFilter{
FilterQuery: fmt.Sprintf("has-agent:%s", "connecting"),
})
require.NoError(t, err)
require.Len(t, res.Workspaces, 1)
require.Equal(t, workspace.ID, res.Workspaces[0].ID)
})
t.Run("FilterQueryHasAgentConnected", func(t *testing.T) {
t.Parallel()
client := coderdtest.New(t, &coderdtest.Options{
IncludeProvisionerDaemon: true,
})
user := coderdtest.CreateFirstUser(t, client)
authToken := uuid.NewString()
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
Parse: echo.ParseComplete,
ProvisionPlan: echo.ProvisionComplete,
ProvisionApply: []*proto.Provision_Response{{
Type: &proto.Provision_Response_Complete{
Complete: &proto.Provision_Complete{
Resources: []*proto.Resource{{
Name: "example",
Type: "aws_instance",
Agents: []*proto.Agent{{
Id: uuid.NewString(),
Auth: &proto.Agent_Token{
Token: authToken,
},
}},
}},
},
},
}},
})
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)
workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID)
coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)
agentClient := codersdk.New(client.URL)
agentClient.SetSessionToken(authToken)
agentCloser := agent.New(agent.Options{
Client: agentClient,
Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug),
})
defer func() {
_ = agentCloser.Close()
}()
coderdtest.AwaitWorkspaceAgents(t, client, workspace.ID)
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
res, err := client.Workspaces(ctx, codersdk.WorkspaceFilter{
FilterQuery: fmt.Sprintf("has-agent:%s", "connected"),
})
require.NoError(t, err)
require.Len(t, res.Workspaces, 1)
require.Equal(t, workspace.ID, res.Workspaces[0].ID)
})
t.Run("FilterQueryHasAgentTimeout", func(t *testing.T) {
t.Parallel()
client := coderdtest.New(t, &coderdtest.Options{
IncludeProvisionerDaemon: true,
})
user := coderdtest.CreateFirstUser(t, client)
authToken := uuid.NewString()
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
Parse: echo.ParseComplete,
ProvisionPlan: echo.ProvisionComplete,
ProvisionApply: []*proto.Provision_Response{{
Type: &proto.Provision_Response_Complete{
Complete: &proto.Provision_Complete{
Resources: []*proto.Resource{{
Name: "example",
Type: "aws_instance",
Agents: []*proto.Agent{{
Id: uuid.NewString(),
Auth: &proto.Agent_Token{
Token: authToken,
},
ConnectionTimeoutSeconds: 1,
}},
}},
},
},
}},
})
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)
workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID)
coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitMedium)
defer cancel()
testutil.Eventually(ctx, t, func(ctx context.Context) (done bool) {
workspaces, err := client.Workspaces(ctx, codersdk.WorkspaceFilter{
FilterQuery: fmt.Sprintf("has-agent:%s", "timeout"),
})
require.NoError(t, err)
return workspaces.Count == 1
}, testutil.IntervalMedium, "agent status timeout")
})
}
func TestOffsetLimit(t *testing.T) {