feat: add debug info to HA coordinator (#5883)

This commit is contained in:
Colin Adler 2023-01-26 16:32:38 -06:00 committed by GitHub
parent 52ecd35c8f
commit cc694a55bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 192 additions and 135 deletions

View File

@ -13,6 +13,7 @@ import (
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
lru "github.com/hashicorp/golang-lru/v2"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"cdr.dev/slog" "cdr.dev/slog"
@ -24,6 +25,12 @@ import (
// that uses PostgreSQL pubsub to exchange handshakes. // that uses PostgreSQL pubsub to exchange handshakes.
func NewCoordinator(logger slog.Logger, pubsub database.Pubsub) (agpl.Coordinator, error) { func NewCoordinator(logger slog.Logger, pubsub database.Pubsub) (agpl.Coordinator, error) {
ctx, cancelFunc := context.WithCancel(context.Background()) ctx, cancelFunc := context.WithCancel(context.Background())
nameCache, err := lru.New[uuid.UUID, string](512)
if err != nil {
panic("make lru cache: " + err.Error())
}
coord := &haCoordinator{ coord := &haCoordinator{
id: uuid.New(), id: uuid.New(),
log: logger, log: logger,
@ -31,8 +38,9 @@ func NewCoordinator(logger slog.Logger, pubsub database.Pubsub) (agpl.Coordinato
closeFunc: cancelFunc, closeFunc: cancelFunc,
close: make(chan struct{}), close: make(chan struct{}),
nodes: map[uuid.UUID]*agpl.Node{}, nodes: map[uuid.UUID]*agpl.Node{},
agentSockets: map[uuid.UUID]net.Conn{}, agentSockets: map[uuid.UUID]*agpl.TrackedConn{},
agentToConnectionSockets: map[uuid.UUID]map[uuid.UUID]net.Conn{}, agentToConnectionSockets: map[uuid.UUID]map[uuid.UUID]*agpl.TrackedConn{},
agentNameCache: nameCache,
} }
if err := coord.runPubsub(ctx); err != nil { if err := coord.runPubsub(ctx); err != nil {
@ -53,10 +61,14 @@ type haCoordinator struct {
// nodes maps agent and connection IDs their respective node. // nodes maps agent and connection IDs their respective node.
nodes map[uuid.UUID]*agpl.Node nodes map[uuid.UUID]*agpl.Node
// agentSockets maps agent IDs to their open websocket. // agentSockets maps agent IDs to their open websocket.
agentSockets map[uuid.UUID]net.Conn agentSockets map[uuid.UUID]*agpl.TrackedConn
// agentToConnectionSockets maps agent IDs to connection IDs of conns that // agentToConnectionSockets maps agent IDs to connection IDs of conns that
// are subscribed to updates for that agent. // are subscribed to updates for that agent.
agentToConnectionSockets map[uuid.UUID]map[uuid.UUID]net.Conn agentToConnectionSockets map[uuid.UUID]map[uuid.UUID]*agpl.TrackedConn
// agentNameCache holds a cache of agent names. If one of them disappears,
// it's helpful to have a name cached for debugging.
agentNameCache *lru.Cache[uuid.UUID, string]
} }
// Node returns an in-memory node by ID. // Node returns an in-memory node by ID.
@ -94,12 +106,18 @@ func (c *haCoordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID
c.mutex.Lock() c.mutex.Lock()
connectionSockets, ok := c.agentToConnectionSockets[agent] connectionSockets, ok := c.agentToConnectionSockets[agent]
if !ok { if !ok {
connectionSockets = map[uuid.UUID]net.Conn{} connectionSockets = map[uuid.UUID]*agpl.TrackedConn{}
c.agentToConnectionSockets[agent] = connectionSockets c.agentToConnectionSockets[agent] = connectionSockets
} }
// Insert this connection into a map so the agent can publish node updates. now := time.Now().Unix()
connectionSockets[id] = conn // Insert this connection into a map so the agent
// can publish node updates.
connectionSockets[id] = &agpl.TrackedConn{
Conn: conn,
Start: now,
LastWrite: now,
}
c.mutex.Unlock() c.mutex.Unlock()
defer func() { defer func() {
@ -176,7 +194,9 @@ func (c *haCoordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *js
// ServeAgent accepts a WebSocket connection to an agent that listens to // ServeAgent accepts a WebSocket connection to an agent that listens to
// incoming connections and publishes node updates. // incoming connections and publishes node updates.
func (c *haCoordinator) ServeAgent(conn net.Conn, id uuid.UUID, _ string) error { func (c *haCoordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error {
c.agentNameCache.Add(id, name)
// Tell clients on other instances to send a callmemaybe to us. // Tell clients on other instances to send a callmemaybe to us.
err := c.publishAgentHello(id) err := c.publishAgentHello(id)
if err != nil { if err != nil {
@ -196,21 +216,41 @@ func (c *haCoordinator) ServeAgent(conn net.Conn, id uuid.UUID, _ string) error
} }
} }
// This uniquely identifies a connection that belongs to this goroutine.
unique := uuid.New()
now := time.Now().Unix()
overwrites := int64(0)
// If an old agent socket is connected, we close it // If an old agent socket is connected, we close it
// to avoid any leaks. This shouldn't ever occur because // to avoid any leaks. This shouldn't ever occur because
// we expect one agent to be running. // we expect one agent to be running.
c.mutex.Lock() c.mutex.Lock()
oldAgentSocket, ok := c.agentSockets[id] oldAgentSocket, ok := c.agentSockets[id]
if ok { if ok {
overwrites = oldAgentSocket.Overwrites + 1
_ = oldAgentSocket.Close() _ = oldAgentSocket.Close()
} }
c.agentSockets[id] = conn c.agentSockets[id] = &agpl.TrackedConn{
ID: unique,
Conn: conn,
Name: name,
Start: now,
LastWrite: now,
Overwrites: overwrites,
}
c.mutex.Unlock() c.mutex.Unlock()
defer func() { defer func() {
c.mutex.Lock() c.mutex.Lock()
defer c.mutex.Unlock() defer c.mutex.Unlock()
delete(c.agentSockets, id)
delete(c.nodes, id) // Only delete the connection if it's ours. It could have been
// overwritten.
if idConn, ok := c.agentSockets[id]; ok && idConn.ID == unique {
delete(c.agentSockets, id)
delete(c.nodes, id)
}
}() }()
decoder := json.NewDecoder(conn) decoder := json.NewDecoder(conn)
@ -576,8 +616,14 @@ func (c *haCoordinator) formatAgentUpdate(id uuid.UUID, node *agpl.Node) ([]byte
return buf.Bytes(), nil return buf.Bytes(), nil
} }
func (*haCoordinator) ServeHTTPDebug(w http.ResponseWriter, _ *http.Request) { func (c *haCoordinator) ServeHTTPDebug(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Content-Type", "text/html; charset=utf-8")
fmt.Fprintf(w, "<h1>coordinator</h1>")
fmt.Fprintf(w, "<h2>ha debug coming soon</h2>") c.mutex.RLock()
defer c.mutex.RUnlock()
fmt.Fprintln(w, "<h1>high-availability wireguard coordinator debug</h1>")
fmt.Fprintln(w, "<h4 style=\"margin-top:-25px\">warning: this only provides info from the node that served the request, if there are multiple replicas this data may be incomplete</h4>")
agpl.CoordinatorHTTPDebug(c.agentSockets, c.agentToConnectionSockets, c.agentNameCache)(w, r)
} }

View File

@ -110,7 +110,7 @@ func ServeCoordinator(conn net.Conn, updateNodes func(node []*Node) error) (func
// coordinator is incompatible with multiple Coder replicas as all node data is // coordinator is incompatible with multiple Coder replicas as all node data is
// in-memory. // in-memory.
func NewCoordinator() Coordinator { func NewCoordinator() Coordinator {
cache, err := lru.New[uuid.UUID, string](512) nameCache, err := lru.New[uuid.UUID, string](512)
if err != nil { if err != nil {
panic("make lru cache: " + err.Error()) panic("make lru cache: " + err.Error())
} }
@ -118,9 +118,9 @@ func NewCoordinator() Coordinator {
return &coordinator{ return &coordinator{
closed: false, closed: false,
nodes: map[uuid.UUID]*Node{}, nodes: map[uuid.UUID]*Node{},
agentSockets: map[uuid.UUID]*trackedConn{}, agentSockets: map[uuid.UUID]*TrackedConn{},
agentToConnectionSockets: map[uuid.UUID]map[uuid.UUID]*trackedConn{}, agentToConnectionSockets: map[uuid.UUID]map[uuid.UUID]*TrackedConn{},
agentNameCache: cache, agentNameCache: nameCache,
} }
} }
@ -138,31 +138,31 @@ type coordinator struct {
// nodes maps agent and connection IDs their respective node. // nodes maps agent and connection IDs their respective node.
nodes map[uuid.UUID]*Node nodes map[uuid.UUID]*Node
// agentSockets maps agent IDs to their open websocket. // agentSockets maps agent IDs to their open websocket.
agentSockets map[uuid.UUID]*trackedConn agentSockets map[uuid.UUID]*TrackedConn
// agentToConnectionSockets maps agent IDs to connection IDs of conns that // agentToConnectionSockets maps agent IDs to connection IDs of conns that
// are subscribed to updates for that agent. // are subscribed to updates for that agent.
agentToConnectionSockets map[uuid.UUID]map[uuid.UUID]*trackedConn agentToConnectionSockets map[uuid.UUID]map[uuid.UUID]*TrackedConn
// agentNameCache holds a cache of agent names. If one of them disappears, // agentNameCache holds a cache of agent names. If one of them disappears,
// it's helpful to have a name cached for debugging. // it's helpful to have a name cached for debugging.
agentNameCache *lru.Cache[uuid.UUID, string] agentNameCache *lru.Cache[uuid.UUID, string]
} }
type trackedConn struct { type TrackedConn struct {
net.Conn net.Conn
// id is an ephemeral UUID used to uniquely identify the owner of the // ID is an ephemeral UUID used to uniquely identify the owner of the
// connection. // connection.
id uuid.UUID ID uuid.UUID
name string Name string
start int64 Start int64
lastWrite int64 LastWrite int64
overwrites int64 Overwrites int64
} }
func (t *trackedConn) Write(b []byte) (n int, err error) { func (t *TrackedConn) Write(b []byte) (n int, err error) {
atomic.StoreInt64(&t.lastWrite, time.Now().Unix()) atomic.StoreInt64(&t.LastWrite, time.Now().Unix())
return t.Conn.Write(b) return t.Conn.Write(b)
} }
@ -212,17 +212,17 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID)
c.mutex.Lock() c.mutex.Lock()
connectionSockets, ok := c.agentToConnectionSockets[agent] connectionSockets, ok := c.agentToConnectionSockets[agent]
if !ok { if !ok {
connectionSockets = map[uuid.UUID]*trackedConn{} connectionSockets = map[uuid.UUID]*TrackedConn{}
c.agentToConnectionSockets[agent] = connectionSockets c.agentToConnectionSockets[agent] = connectionSockets
} }
now := time.Now().Unix() now := time.Now().Unix()
// Insert this connection into a map so the agent // Insert this connection into a map so the agent
// can publish node updates. // can publish node updates.
connectionSockets[id] = &trackedConn{ connectionSockets[id] = &TrackedConn{
Conn: conn, Conn: conn,
start: now, Start: now,
lastWrite: now, LastWrite: now,
} }
c.mutex.Unlock() c.mutex.Unlock()
defer func() { defer func() {
@ -337,17 +337,17 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error
// dead. // dead.
oldAgentSocket, ok := c.agentSockets[id] oldAgentSocket, ok := c.agentSockets[id]
if ok { if ok {
overwrites = oldAgentSocket.overwrites + 1 overwrites = oldAgentSocket.Overwrites + 1
_ = oldAgentSocket.Close() _ = oldAgentSocket.Close()
} }
c.agentSockets[id] = &trackedConn{ c.agentSockets[id] = &TrackedConn{
id: unique, ID: unique,
Conn: conn, Conn: conn,
name: name, Name: name,
start: now, Start: now,
lastWrite: now, LastWrite: now,
overwrites: overwrites, Overwrites: overwrites,
} }
c.mutex.Unlock() c.mutex.Unlock()
@ -357,7 +357,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error
// Only delete the connection if it's ours. It could have been // Only delete the connection if it's ours. It could have been
// overwritten. // overwritten.
if idConn, ok := c.agentSockets[id]; ok && idConn.id == unique { if idConn, ok := c.agentSockets[id]; ok && idConn.ID == unique {
delete(c.agentSockets, id) delete(c.agentSockets, id)
delete(c.nodes, id) delete(c.nodes, id)
} }
@ -450,123 +450,134 @@ func (c *coordinator) Close() error {
return nil return nil
} }
func (c *coordinator) ServeHTTPDebug(w http.ResponseWriter, _ *http.Request) { func (c *coordinator) ServeHTTPDebug(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Content-Type", "text/html; charset=utf-8")
now := time.Now()
c.mutex.RLock() c.mutex.RLock()
defer c.mutex.RUnlock() defer c.mutex.RUnlock()
fmt.Fprintln(w, "<h1>in-memory wireguard coordinator debug</h1>") fmt.Fprintln(w, "<h1>in-memory wireguard coordinator debug</h1>")
type idConn struct { CoordinatorHTTPDebug(c.agentSockets, c.agentToConnectionSockets, c.agentNameCache)(w, r)
id uuid.UUID }
conn *trackedConn
}
{ func CoordinatorHTTPDebug(
fmt.Fprintf(w, "<h2 id=agents><a href=#agents>#</a> agents: total %d</h2>\n", len(c.agentSockets)) agentSocketsMap map[uuid.UUID]*TrackedConn,
fmt.Fprintln(w, "<ul>") agentToConnectionSocketsMap map[uuid.UUID]map[uuid.UUID]*TrackedConn,
agentSockets := make([]idConn, 0, len(c.agentSockets)) agentNameCache *lru.Cache[uuid.UUID, string],
) func(w http.ResponseWriter, _ *http.Request) {
return func(w http.ResponseWriter, _ *http.Request) {
now := time.Now()
for id, conn := range c.agentSockets { type idConn struct {
agentSockets = append(agentSockets, idConn{id, conn}) id uuid.UUID
conn *TrackedConn
} }
slices.SortFunc(agentSockets, func(a, b idConn) bool { {
return a.conn.name < b.conn.name fmt.Fprintf(w, "<h2 id=agents><a href=#agents>#</a> agents: total %d</h2>\n", len(agentSocketsMap))
}) fmt.Fprintln(w, "<ul>")
agentSockets := make([]idConn, 0, len(agentSocketsMap))
for _, agent := range agentSockets { for id, conn := range agentSocketsMap {
fmt.Fprintf(w, "<li style=\"margin-top:4px\"><b>%s</b> (<code>%s</code>): created %v ago, write %v ago, overwrites %d </li>\n", agentSockets = append(agentSockets, idConn{id, conn})
agent.conn.name, }
agent.id.String(),
now.Sub(time.Unix(agent.conn.start, 0)).Round(time.Second),
now.Sub(time.Unix(agent.conn.lastWrite, 0)).Round(time.Second),
agent.conn.overwrites,
)
if conns := c.agentToConnectionSockets[agent.id]; len(conns) > 0 { slices.SortFunc(agentSockets, func(a, b idConn) bool {
fmt.Fprintf(w, "<h3 style=\"margin:0px;font-size:16px;font-weight:400\">connections: total %d</h3>\n", len(conns)) return a.conn.Name < b.conn.Name
})
connSockets := make([]idConn, 0, len(conns)) for _, agent := range agentSockets {
for id, conn := range conns { fmt.Fprintf(w, "<li style=\"margin-top:4px\"><b>%s</b> (<code>%s</code>): created %v ago, write %v ago, overwrites %d </li>\n",
connSockets = append(connSockets, idConn{id, conn}) agent.conn.Name,
agent.id.String(),
now.Sub(time.Unix(agent.conn.Start, 0)).Round(time.Second),
now.Sub(time.Unix(agent.conn.LastWrite, 0)).Round(time.Second),
agent.conn.Overwrites,
)
if conns := agentToConnectionSocketsMap[agent.id]; len(conns) > 0 {
fmt.Fprintf(w, "<h3 style=\"margin:0px;font-size:16px;font-weight:400\">connections: total %d</h3>\n", len(conns))
connSockets := make([]idConn, 0, len(conns))
for id, conn := range conns {
connSockets = append(connSockets, idConn{id, conn})
}
slices.SortFunc(connSockets, func(a, b idConn) bool {
return a.id.String() < b.id.String()
})
fmt.Fprintln(w, "<ul>")
for _, connSocket := range connSockets {
fmt.Fprintf(w, "<li><b>%s</b> (<code>%s</code>): created %v ago, write %v ago </li>\n",
connSocket.conn.Name,
connSocket.id.String(),
now.Sub(time.Unix(connSocket.conn.Start, 0)).Round(time.Second),
now.Sub(time.Unix(connSocket.conn.LastWrite, 0)).Round(time.Second),
)
}
fmt.Fprintln(w, "</ul>")
} }
slices.SortFunc(connSockets, func(a, b idConn) bool { }
return a.id.String() < b.id.String()
})
fmt.Fprintln(w, "</ul>")
}
{
type agentConns struct {
id uuid.UUID
conns []idConn
}
missingAgents := []agentConns{}
for agentID, conns := range agentToConnectionSocketsMap {
if len(conns) == 0 {
continue
}
if _, ok := agentSocketsMap[agentID]; !ok {
connsSlice := make([]idConn, 0, len(conns))
for id, conn := range conns {
connsSlice = append(connsSlice, idConn{id, conn})
}
slices.SortFunc(connsSlice, func(a, b idConn) bool {
return a.id.String() < b.id.String()
})
missingAgents = append(missingAgents, agentConns{agentID, connsSlice})
}
}
slices.SortFunc(missingAgents, func(a, b agentConns) bool {
return a.id.String() < b.id.String()
})
fmt.Fprintf(w, "<h2 id=missing-agents><a href=#missing-agents>#</a> missing agents: total %d</h2>\n", len(missingAgents))
fmt.Fprintln(w, "<ul>")
for _, agentConns := range missingAgents {
agentName, ok := agentNameCache.Get(agentConns.id)
if !ok {
agentName = "unknown"
}
fmt.Fprintf(w, "<li style=\"margin-top:4px\"><b>%s</b> (<code>%s</code>): created ? ago, write ? ago, overwrites ? </li>\n",
agentName,
agentConns.id.String(),
)
fmt.Fprintf(w, "<h3 style=\"margin:0px;font-size:16px;font-weight:400\">connections: total %d</h3>\n", len(agentConns.conns))
fmt.Fprintln(w, "<ul>") fmt.Fprintln(w, "<ul>")
for _, connSocket := range connSockets { for _, agentConn := range agentConns.conns {
fmt.Fprintf(w, "<li><b>%s</b> (<code>%s</code>): created %v ago, write %v ago </li>\n", fmt.Fprintf(w, "<li><b>%s</b> (<code>%s</code>): created %v ago, write %v ago </li>\n",
connSocket.conn.name, agentConn.conn.Name,
connSocket.id.String(), agentConn.id.String(),
now.Sub(time.Unix(connSocket.conn.start, 0)).Round(time.Second), now.Sub(time.Unix(agentConn.conn.Start, 0)).Round(time.Second),
now.Sub(time.Unix(connSocket.conn.lastWrite, 0)).Round(time.Second), now.Sub(time.Unix(agentConn.conn.LastWrite, 0)).Round(time.Second),
) )
} }
fmt.Fprintln(w, "</ul>") fmt.Fprintln(w, "</ul>")
} }
}
fmt.Fprintln(w, "</ul>")
}
{
type agentConns struct {
id uuid.UUID
conns []idConn
}
missingAgents := []agentConns{}
for agentID, conns := range c.agentToConnectionSockets {
if len(conns) == 0 {
continue
}
if _, ok := c.agentSockets[agentID]; !ok {
connsSlice := make([]idConn, 0, len(conns))
for id, conn := range conns {
connsSlice = append(connsSlice, idConn{id, conn})
}
slices.SortFunc(connsSlice, func(a, b idConn) bool {
return a.id.String() < b.id.String()
})
missingAgents = append(missingAgents, agentConns{agentID, connsSlice})
}
}
slices.SortFunc(missingAgents, func(a, b agentConns) bool {
return a.id.String() < b.id.String()
})
fmt.Fprintf(w, "<h2 id=missing-agents><a href=#missing-agents>#</a> missing agents: total %d</h2>\n", len(missingAgents))
fmt.Fprintln(w, "<ul>")
for _, agentConns := range missingAgents {
agentName, ok := c.agentNameCache.Get(agentConns.id)
if !ok {
agentName = "unknown"
}
fmt.Fprintf(w, "<li style=\"margin-top:4px\"><b>%s</b> (<code>%s</code>): created ? ago, write ? ago, overwrites ? </li>\n",
agentName,
agentConns.id.String(),
)
fmt.Fprintf(w, "<h3 style=\"margin:0px;font-size:16px;font-weight:400\">connections: total %d</h3>\n", len(agentConns.conns))
fmt.Fprintln(w, "<ul>")
for _, agentConn := range agentConns.conns {
fmt.Fprintf(w, "<li><b>%s</b> (<code>%s</code>): created %v ago, write %v ago </li>\n",
agentConn.conn.name,
agentConn.id.String(),
now.Sub(time.Unix(agentConn.conn.start, 0)).Round(time.Second),
now.Sub(time.Unix(agentConn.conn.lastWrite, 0)).Round(time.Second),
)
}
fmt.Fprintln(w, "</ul>") fmt.Fprintln(w, "</ul>")
} }
fmt.Fprintln(w, "</ul>")
} }
} }