mirror of https://github.com/coder/coder.git
fix(coderd): Detect agent disconnect via inactivity (#6528)
Fixes #5901
This commit is contained in:
parent
7fa6483d84
commit
179d9e0d24
|
@ -11,8 +11,10 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"runtime/pprof"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
@ -291,11 +293,12 @@ func (api *API) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
go httpapi.Heartbeat(ctx, conn)
|
|
||||||
|
|
||||||
_, wsNetConn := websocketNetConn(ctx, conn, websocket.MessageBinary)
|
ctx, wsNetConn := websocketNetConn(ctx, conn, websocket.MessageBinary)
|
||||||
defer wsNetConn.Close() // Also closes conn.
|
defer wsNetConn.Close() // Also closes conn.
|
||||||
|
|
||||||
|
go httpapi.Heartbeat(ctx, conn)
|
||||||
|
|
||||||
agentConn, release, err := api.workspaceAgentCache.Acquire(r, workspaceAgent.ID)
|
agentConn, release, err := api.workspaceAgentCache.Acquire(r, workspaceAgent.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("dial workspace agent: %s", err))
|
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("dial workspace agent: %s", err))
|
||||||
|
@ -606,11 +609,40 @@ func (api *API) workspaceAgentCoordinate(rw http.ResponseWriter, r *http.Request
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
go httpapi.Heartbeat(ctx, conn)
|
|
||||||
|
|
||||||
ctx, wsNetConn := websocketNetConn(ctx, conn, websocket.MessageBinary)
|
ctx, wsNetConn := websocketNetConn(ctx, conn, websocket.MessageBinary)
|
||||||
defer wsNetConn.Close()
|
defer wsNetConn.Close()
|
||||||
|
|
||||||
|
// We use a custom heartbeat routine here instead of `httpapi.Heartbeat`
|
||||||
|
// because we want to log the agent's last ping time.
|
||||||
|
var lastPing time.Time
|
||||||
|
var pingMu sync.Mutex
|
||||||
|
go pprof.Do(ctx, pprof.Labels("agent", workspaceAgent.ID.String()), func(ctx context.Context) {
|
||||||
|
// TODO(mafredri): Is this too frequent? Use separate ping disconnect timeout?
|
||||||
|
t := time.NewTicker(api.AgentConnectionUpdateFrequency)
|
||||||
|
defer t.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-t.C:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// We don't need a context that times out here because the ping will
|
||||||
|
// eventually go through. If the context times out, then other
|
||||||
|
// websocket read operations will receive an error, obfuscating the
|
||||||
|
// actual problem.
|
||||||
|
err := conn.Ping(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
pingMu.Lock()
|
||||||
|
lastPing = time.Now()
|
||||||
|
pingMu.Unlock()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
firstConnectedAt := workspaceAgent.FirstConnectedAt
|
firstConnectedAt := workspaceAgent.FirstConnectedAt
|
||||||
if !firstConnectedAt.Valid {
|
if !firstConnectedAt.Valid {
|
||||||
firstConnectedAt = sql.NullTime{
|
firstConnectedAt = sql.NullTime{
|
||||||
|
@ -654,9 +686,12 @@ func (api *API) workspaceAgentCoordinate(rw http.ResponseWriter, r *http.Request
|
||||||
ctx, cancel := context.WithTimeout(dbauthz.AsSystemRestricted(api.ctx), api.AgentInactiveDisconnectTimeout)
|
ctx, cancel := context.WithTimeout(dbauthz.AsSystemRestricted(api.ctx), api.AgentInactiveDisconnectTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
disconnectedAt = sql.NullTime{
|
// Only update timestamp if the disconnect is new.
|
||||||
Time: database.Now(),
|
if !disconnectedAt.Valid {
|
||||||
Valid: true,
|
disconnectedAt = sql.NullTime{
|
||||||
|
Time: database.Now(),
|
||||||
|
Valid: true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
err := updateConnectionTimes(ctx)
|
err := updateConnectionTimes(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -711,15 +746,37 @@ func (api *API) workspaceAgentCoordinate(rw http.ResponseWriter, r *http.Request
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
}
|
}
|
||||||
lastConnectedAt = sql.NullTime{
|
|
||||||
Time: database.Now(),
|
pingMu.Lock()
|
||||||
Valid: true,
|
lastPing := lastPing
|
||||||
|
pingMu.Unlock()
|
||||||
|
|
||||||
|
var connectionStatusChanged bool
|
||||||
|
if time.Since(lastPing) > api.AgentInactiveDisconnectTimeout {
|
||||||
|
if !disconnectedAt.Valid {
|
||||||
|
connectionStatusChanged = true
|
||||||
|
disconnectedAt = sql.NullTime{
|
||||||
|
Time: database.Now(),
|
||||||
|
Valid: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
connectionStatusChanged = disconnectedAt.Valid
|
||||||
|
// TODO(mafredri): Should we update it here or allow lastConnectedAt to shadow it?
|
||||||
|
disconnectedAt = sql.NullTime{}
|
||||||
|
lastConnectedAt = sql.NullTime{
|
||||||
|
Time: database.Now(),
|
||||||
|
Valid: true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
err = updateConnectionTimes(ctx)
|
err = updateConnectionTimes(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = conn.Close(websocket.StatusGoingAway, err.Error())
|
_ = conn.Close(websocket.StatusGoingAway, err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if connectionStatusChanged {
|
||||||
|
api.publishWorkspaceUpdate(ctx, build.WorkspaceID)
|
||||||
|
}
|
||||||
err := ensureLatestBuild()
|
err := ensureLatestBuild()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Disconnect agents that are no longer valid.
|
// Disconnect agents that are no longer valid.
|
||||||
|
|
Loading…
Reference in New Issue