mirror of https://github.com/coder/coder.git
chore: tailnet debug logging (#7260)
* Enable discovery (disco) debug Signed-off-by: Spike Curtis <spike@coder.com> * Better debug on reconnectingPTY Signed-off-by: Spike Curtis <spike@coder.com> * Agent logging in appstest Signed-off-by: Spike Curtis <spike@coder.com> * More reconnectingPTY logging Signed-off-by: Spike Curtis <spike@coder.com> * Add logging to coordinator Signed-off-by: Spike Curtis <spike@coder.com> * Update agent/agent.go Co-authored-by: Mathias Fredriksson <mafredri@gmail.com> * Update agent/agent.go Co-authored-by: Mathias Fredriksson <mafredri@gmail.com> * Update agent/agent.go Co-authored-by: Mathias Fredriksson <mafredri@gmail.com> * Update agent/agent.go Co-authored-by: Mathias Fredriksson <mafredri@gmail.com> * Clarify logs; remove unrelated changes Signed-off-by: Spike Curtis <spike@coder.com> --------- Signed-off-by: Spike Curtis <spike@coder.com> Co-authored-by: Mathias Fredriksson <mafredri@gmail.com>
This commit is contained in:
parent
7aa07cfc8d
commit
b6666cf1cf
|
@ -301,6 +301,7 @@ jobs:
|
|||
echo "cover=false" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
export TS_DEBUG_DISCO=true
|
||||
gotestsum --junitfile="gotests.xml" --jsonfile="gotests.json" --packages="./..." -- -parallel=8 -timeout=7m -short -failfast $COVERAGE_FLAGS
|
||||
|
||||
- name: Print test stats
|
||||
|
@ -377,6 +378,7 @@ jobs:
|
|||
|
||||
- name: Test with PostgreSQL Database
|
||||
run: |
|
||||
export TS_DEBUG_DISCO=true
|
||||
make test-postgres
|
||||
|
||||
- name: Print test stats
|
||||
|
|
|
@ -653,6 +653,7 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (_
|
|||
}
|
||||
break
|
||||
}
|
||||
logger.Debug(ctx, "accepted conn", slog.F("remote", conn.RemoteAddr().String()))
|
||||
wg.Add(1)
|
||||
closed := make(chan struct{})
|
||||
go func() {
|
||||
|
@ -681,6 +682,7 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (_
|
|||
var msg codersdk.WorkspaceAgentReconnectingPTYInit
|
||||
err = json.Unmarshal(data, &msg)
|
||||
if err != nil {
|
||||
logger.Warn(ctx, "failed to unmarshal init", slog.F("raw", data))
|
||||
return
|
||||
}
|
||||
_ = a.handleReconnectingPTY(ctx, logger, msg, conn)
|
||||
|
@ -972,6 +974,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
|
|||
|
||||
connectionID := uuid.NewString()
|
||||
logger = logger.With(slog.F("id", msg.ID), slog.F("connection_id", connectionID))
|
||||
logger.Debug(ctx, "starting handler")
|
||||
|
||||
defer func() {
|
||||
if err := retErr; err != nil {
|
||||
|
@ -1039,6 +1042,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
|
|||
// 1. The timeout completed.
|
||||
// 2. The parent context was canceled.
|
||||
<-ctx.Done()
|
||||
logger.Debug(ctx, "context done", slog.Error(ctx.Err()))
|
||||
_ = process.Kill()
|
||||
}()
|
||||
// We don't need to separately monitor for the process exiting.
|
||||
|
@ -1050,6 +1054,8 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
|
|||
read, err := rpty.ptty.OutputReader().Read(buffer)
|
||||
if err != nil {
|
||||
// When the PTY is closed, this is triggered.
|
||||
// Error is typically a benign EOF, so only log for debugging.
|
||||
logger.Debug(ctx, "unable to read pty output, command exited?", slog.Error(err))
|
||||
break
|
||||
}
|
||||
part := buffer[:read]
|
||||
|
@ -1061,8 +1067,15 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
|
|||
break
|
||||
}
|
||||
rpty.activeConnsMutex.Lock()
|
||||
for _, conn := range rpty.activeConns {
|
||||
_, _ = conn.Write(part)
|
||||
for cid, conn := range rpty.activeConns {
|
||||
_, err = conn.Write(part)
|
||||
if err != nil {
|
||||
logger.Debug(ctx,
|
||||
"error writing to active conn",
|
||||
slog.F("other_conn_id", cid),
|
||||
slog.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
rpty.activeConnsMutex.Unlock()
|
||||
}
|
||||
|
|
|
@ -879,6 +879,7 @@ func TestAgent_StartupScript(t *testing.T) {
|
|||
}
|
||||
t.Run("Success", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
client := &client{
|
||||
t: t,
|
||||
agentID: uuid.New(),
|
||||
|
@ -887,12 +888,12 @@ func TestAgent_StartupScript(t *testing.T) {
|
|||
DERPMap: &tailcfg.DERPMap{},
|
||||
},
|
||||
statsChan: make(chan *agentsdk.Stats),
|
||||
coordinator: tailnet.NewCoordinator(),
|
||||
coordinator: tailnet.NewCoordinator(logger),
|
||||
}
|
||||
closer := agent.New(agent.Options{
|
||||
Client: client,
|
||||
Filesystem: afero.NewMemMapFs(),
|
||||
Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug),
|
||||
Logger: logger.Named("agent"),
|
||||
ReconnectingPTYTimeout: 0,
|
||||
})
|
||||
t.Cleanup(func() {
|
||||
|
@ -910,6 +911,7 @@ func TestAgent_StartupScript(t *testing.T) {
|
|||
// script has written too many lines it will still succeed!
|
||||
t.Run("OverflowsAndSkips", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
client := &client{
|
||||
t: t,
|
||||
agentID: uuid.New(),
|
||||
|
@ -927,12 +929,12 @@ func TestAgent_StartupScript(t *testing.T) {
|
|||
return codersdk.ReadBodyAsError(res)
|
||||
},
|
||||
statsChan: make(chan *agentsdk.Stats),
|
||||
coordinator: tailnet.NewCoordinator(),
|
||||
coordinator: tailnet.NewCoordinator(logger),
|
||||
}
|
||||
closer := agent.New(agent.Options{
|
||||
Client: client,
|
||||
Filesystem: afero.NewMemMapFs(),
|
||||
Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug),
|
||||
Logger: logger.Named("agent"),
|
||||
ReconnectingPTYTimeout: 0,
|
||||
})
|
||||
t.Cleanup(func() {
|
||||
|
@ -1282,7 +1284,7 @@ func TestAgent_Lifecycle(t *testing.T) {
|
|||
|
||||
t.Run("ShutdownScriptOnce", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
expected := "this-is-shutdown"
|
||||
client := &client{
|
||||
t: t,
|
||||
|
@ -1293,13 +1295,13 @@ func TestAgent_Lifecycle(t *testing.T) {
|
|||
ShutdownScript: "echo " + expected,
|
||||
},
|
||||
statsChan: make(chan *agentsdk.Stats),
|
||||
coordinator: tailnet.NewCoordinator(),
|
||||
coordinator: tailnet.NewCoordinator(logger),
|
||||
}
|
||||
|
||||
fs := afero.NewMemMapFs()
|
||||
agent := agent.New(agent.Options{
|
||||
Client: client,
|
||||
Logger: slogtest.Make(t, nil).Leveled(slog.LevelInfo),
|
||||
Logger: logger.Named("agent"),
|
||||
Filesystem: fs,
|
||||
})
|
||||
|
||||
|
@ -1548,9 +1550,10 @@ func TestAgent_Speedtest(t *testing.T) {
|
|||
|
||||
func TestAgent_Reconnect(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
// After the agent is disconnected from a coordinator, it's supposed
|
||||
// to reconnect!
|
||||
coordinator := tailnet.NewCoordinator()
|
||||
coordinator := tailnet.NewCoordinator(logger)
|
||||
defer coordinator.Close()
|
||||
|
||||
agentID := uuid.New()
|
||||
|
@ -1572,7 +1575,7 @@ func TestAgent_Reconnect(t *testing.T) {
|
|||
return "", nil
|
||||
},
|
||||
Client: client,
|
||||
Logger: slogtest.Make(t, nil).Leveled(slog.LevelInfo),
|
||||
Logger: logger.Named("agent"),
|
||||
})
|
||||
defer closer.Close()
|
||||
|
||||
|
@ -1587,8 +1590,8 @@ func TestAgent_Reconnect(t *testing.T) {
|
|||
|
||||
func TestAgent_WriteVSCodeConfigs(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
coordinator := tailnet.NewCoordinator()
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
coordinator := tailnet.NewCoordinator(logger)
|
||||
defer coordinator.Close()
|
||||
|
||||
client := &client{
|
||||
|
@ -1607,7 +1610,7 @@ func TestAgent_WriteVSCodeConfigs(t *testing.T) {
|
|||
return "", nil
|
||||
},
|
||||
Client: client,
|
||||
Logger: slogtest.Make(t, nil).Leveled(slog.LevelInfo),
|
||||
Logger: logger.Named("agent"),
|
||||
Filesystem: filesystem,
|
||||
})
|
||||
defer closer.Close()
|
||||
|
@ -1698,10 +1701,11 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
|
|||
afero.Fs,
|
||||
io.Closer,
|
||||
) {
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
if metadata.DERPMap == nil {
|
||||
metadata.DERPMap = tailnettest.RunDERPAndSTUN(t)
|
||||
}
|
||||
coordinator := tailnet.NewCoordinator()
|
||||
coordinator := tailnet.NewCoordinator(logger)
|
||||
t.Cleanup(func() {
|
||||
_ = coordinator.Close()
|
||||
})
|
||||
|
@ -1718,7 +1722,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
|
|||
closer := agent.New(agent.Options{
|
||||
Client: c,
|
||||
Filesystem: fs,
|
||||
Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug),
|
||||
Logger: logger.Named("agent"),
|
||||
ReconnectingPTYTimeout: ptyTimeout,
|
||||
})
|
||||
t.Cleanup(func() {
|
||||
|
@ -1727,7 +1731,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
|
|||
conn, err := tailnet.NewConn(&tailnet.Options{
|
||||
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(), 128)},
|
||||
DERPMap: metadata.DERPMap,
|
||||
Logger: slogtest.Make(t, nil).Named("client").Leveled(slog.LevelDebug),
|
||||
Logger: logger.Named("client"),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
clientConn, serverConn := net.Pipe()
|
||||
|
|
|
@ -221,7 +221,7 @@ func New(options *Options) *API {
|
|||
options.PrometheusRegistry = prometheus.NewRegistry()
|
||||
}
|
||||
if options.TailnetCoordinator == nil {
|
||||
options.TailnetCoordinator = tailnet.NewCoordinator()
|
||||
options.TailnetCoordinator = tailnet.NewCoordinator(options.Logger)
|
||||
}
|
||||
if options.DERPServer == nil {
|
||||
options.DERPServer = derp.NewServer(key.NewNode(), tailnet.Logger(options.Logger.Named("derp")))
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"cdr.dev/slog"
|
||||
"cdr.dev/slog/sloggers/slogtest"
|
||||
|
||||
"github.com/coder/coder/coderd/coderdtest"
|
||||
|
@ -298,7 +299,7 @@ func TestAgents(t *testing.T) {
|
|||
coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)
|
||||
|
||||
// given
|
||||
coordinator := tailnet.NewCoordinator()
|
||||
coordinator := tailnet.NewCoordinator(slogtest.Make(t, nil).Leveled(slog.LevelDebug))
|
||||
coordinatorPtr := atomic.Pointer[tailnet.Coordinator]{}
|
||||
coordinatorPtr.Store(&coordinator)
|
||||
derpMap := tailnettest.RunDERPAndSTUN(t)
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"cdr.dev/slog"
|
||||
"cdr.dev/slog/sloggers/slogtest"
|
||||
"github.com/coder/coder/agent"
|
||||
"github.com/coder/coder/coderd/coderdtest"
|
||||
|
@ -364,7 +365,7 @@ func createWorkspaceWithApps(t *testing.T, client *codersdk.Client, orgID uuid.U
|
|||
}
|
||||
agentCloser := agent.New(agent.Options{
|
||||
Client: agentClient,
|
||||
Logger: slogtest.Make(t, nil).Named("agent"),
|
||||
Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug),
|
||||
})
|
||||
t.Cleanup(func() {
|
||||
_ = agentCloser.Close()
|
||||
|
|
|
@ -600,6 +600,8 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
|
|||
if !ok {
|
||||
return
|
||||
}
|
||||
log := s.Logger.With(slog.F("agent_id", appToken.AgentID))
|
||||
log.Debug(ctx, "resolved PTY request")
|
||||
|
||||
values := r.URL.Query()
|
||||
parser := httpapi.NewQueryParamParser()
|
||||
|
@ -632,19 +634,22 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
|
|||
|
||||
agentConn, release, err := s.WorkspaceConnCache.Acquire(appToken.AgentID)
|
||||
if err != nil {
|
||||
s.Logger.Debug(ctx, "dial workspace agent", slog.Error(err))
|
||||
log.Debug(ctx, "dial workspace agent", slog.Error(err))
|
||||
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("dial workspace agent: %s", err))
|
||||
return
|
||||
}
|
||||
defer release()
|
||||
log.Debug(ctx, "dialed workspace agent")
|
||||
ptNetConn, err := agentConn.ReconnectingPTY(ctx, reconnect, uint16(height), uint16(width), r.URL.Query().Get("command"))
|
||||
if err != nil {
|
||||
s.Logger.Debug(ctx, "dial reconnecting pty server in workspace agent", slog.Error(err))
|
||||
log.Debug(ctx, "dial reconnecting pty server in workspace agent", slog.Error(err))
|
||||
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("dial: %s", err))
|
||||
return
|
||||
}
|
||||
defer ptNetConn.Close()
|
||||
log.Debug(ctx, "obtained PTY")
|
||||
agentssh.Bicopy(ctx, wsNetConn, ptNetConn)
|
||||
log.Debug(ctx, "pty Bicopy finished")
|
||||
}
|
||||
|
||||
// wsNetConn wraps net.Conn created by websocket.NetConn(). Cancel func
|
||||
|
|
|
@ -156,10 +156,10 @@ func TestCache(t *testing.T) {
|
|||
|
||||
func setupAgent(t *testing.T, manifest agentsdk.Manifest, ptyTimeout time.Duration) *codersdk.WorkspaceAgentConn {
|
||||
t.Helper()
|
||||
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
manifest.DERPMap = tailnettest.RunDERPAndSTUN(t)
|
||||
|
||||
coordinator := tailnet.NewCoordinator()
|
||||
coordinator := tailnet.NewCoordinator(logger)
|
||||
t.Cleanup(func() {
|
||||
_ = coordinator.Close()
|
||||
})
|
||||
|
@ -171,7 +171,7 @@ func setupAgent(t *testing.T, manifest agentsdk.Manifest, ptyTimeout time.Durati
|
|||
manifest: manifest,
|
||||
coordinator: coordinator,
|
||||
},
|
||||
Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelInfo),
|
||||
Logger: logger.Named("agent"),
|
||||
ReconnectingPTYTimeout: ptyTimeout,
|
||||
})
|
||||
t.Cleanup(func() {
|
||||
|
|
|
@ -390,7 +390,7 @@ func (api *API) updateEntitlements(ctx context.Context) error {
|
|||
}
|
||||
|
||||
if changed, enabled := featureChanged(codersdk.FeatureHighAvailability); changed {
|
||||
coordinator := agpltailnet.NewCoordinator()
|
||||
coordinator := agpltailnet.NewCoordinator(api.Logger)
|
||||
if enabled {
|
||||
haCoordinator, err := tailnet.NewCoordinator(api.Logger, api.Pubsub)
|
||||
if err != nil {
|
||||
|
|
|
@ -13,6 +13,8 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"cdr.dev/slog"
|
||||
|
||||
"github.com/google/uuid"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"golang.org/x/exp/slices"
|
||||
|
@ -111,16 +113,19 @@ func ServeCoordinator(conn net.Conn, updateNodes func(node []*Node) error) (func
|
|||
}, errChan
|
||||
}
|
||||
|
||||
const loggerName = "coord"
|
||||
|
||||
// NewCoordinator constructs a new in-memory connection coordinator. This
|
||||
// coordinator is incompatible with multiple Coder replicas as all node data is
|
||||
// in-memory.
|
||||
func NewCoordinator() Coordinator {
|
||||
func NewCoordinator(logger slog.Logger) Coordinator {
|
||||
nameCache, err := lru.New[uuid.UUID, string](512)
|
||||
if err != nil {
|
||||
panic("make lru cache: " + err.Error())
|
||||
}
|
||||
|
||||
return &coordinator{
|
||||
logger: logger.Named(loggerName),
|
||||
closed: false,
|
||||
nodes: map[uuid.UUID]*Node{},
|
||||
agentSockets: map[uuid.UUID]*TrackedConn{},
|
||||
|
@ -137,6 +142,7 @@ func NewCoordinator() Coordinator {
|
|||
// This coordinator is incompatible with multiple Coder
|
||||
// replicas as all node data is in-memory.
|
||||
type coordinator struct {
|
||||
logger slog.Logger
|
||||
mutex sync.RWMutex
|
||||
closed bool
|
||||
|
||||
|
@ -194,6 +200,8 @@ func (c *coordinator) AgentCount() int {
|
|||
// ServeClient accepts a WebSocket connection that wants to connect to an agent
|
||||
// with the specified ID.
|
||||
func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) error {
|
||||
logger := c.logger.With(slog.F("client_id", id), slog.F("agent_id", agent))
|
||||
logger.Debug(context.Background(), "coordinating client")
|
||||
c.mutex.Lock()
|
||||
if c.closed {
|
||||
c.mutex.Unlock()
|
||||
|
@ -210,6 +218,7 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID)
|
|||
return xerrors.Errorf("marshal node: %w", err)
|
||||
}
|
||||
_, err = conn.Write(data)
|
||||
logger.Debug(context.Background(), "wrote initial node")
|
||||
if err != nil {
|
||||
return xerrors.Errorf("write nodes: %w", err)
|
||||
}
|
||||
|
@ -230,20 +239,24 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID)
|
|||
LastWrite: now,
|
||||
}
|
||||
c.mutex.Unlock()
|
||||
logger.Debug(context.Background(), "added tracked connection")
|
||||
defer func() {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
// Clean all traces of this connection from the map.
|
||||
delete(c.nodes, id)
|
||||
logger.Debug(context.Background(), "deleted client node")
|
||||
connectionSockets, ok := c.agentToConnectionSockets[agent]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
delete(connectionSockets, id)
|
||||
logger.Debug(context.Background(), "deleted client connectionSocket from map")
|
||||
if len(connectionSockets) != 0 {
|
||||
return
|
||||
}
|
||||
delete(c.agentToConnectionSockets, agent)
|
||||
logger.Debug(context.Background(), "deleted last client connectionSocket from map")
|
||||
}()
|
||||
|
||||
decoder := json.NewDecoder(conn)
|
||||
|
@ -259,11 +272,13 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID)
|
|||
}
|
||||
|
||||
func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json.Decoder) error {
|
||||
logger := c.logger.With(slog.F("client_id", id), slog.F("agent_id", agent))
|
||||
var node Node
|
||||
err := decoder.Decode(&node)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("read json: %w", err)
|
||||
}
|
||||
logger.Debug(context.Background(), "got client node update", slog.F("node", node))
|
||||
|
||||
c.mutex.Lock()
|
||||
// Update the node of this client in our in-memory map. If an agent entirely
|
||||
|
@ -274,6 +289,7 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json
|
|||
agentSocket, ok := c.agentSockets[agent]
|
||||
if !ok {
|
||||
c.mutex.Unlock()
|
||||
logger.Debug(context.Background(), "no agent socket, unable to send node")
|
||||
return nil
|
||||
}
|
||||
c.mutex.Unlock()
|
||||
|
@ -291,6 +307,7 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json
|
|||
}
|
||||
return xerrors.Errorf("write json: %w", err)
|
||||
}
|
||||
logger.Debug(context.Background(), "sent client node to agent")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -298,6 +315,8 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json
|
|||
// ServeAgent accepts a WebSocket connection to an agent that
|
||||
// listens to incoming connections and publishes node updates.
|
||||
func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error {
|
||||
logger := c.logger.With(slog.F("agent_id", id))
|
||||
logger.Debug(context.Background(), "coordinating agent")
|
||||
c.mutex.Lock()
|
||||
if c.closed {
|
||||
c.mutex.Unlock()
|
||||
|
@ -324,6 +343,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error
|
|||
return xerrors.Errorf("marshal json: %w", err)
|
||||
}
|
||||
_, err = conn.Write(data)
|
||||
logger.Debug(context.Background(), "wrote initial client(s) to agent", slog.F("nodes", nodes))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("write nodes: %w", err)
|
||||
}
|
||||
|
@ -356,6 +376,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error
|
|||
}
|
||||
|
||||
c.mutex.Unlock()
|
||||
logger.Debug(context.Background(), "added agent socket")
|
||||
defer func() {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
@ -365,6 +386,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error
|
|||
if idConn, ok := c.agentSockets[id]; ok && idConn.ID == unique {
|
||||
delete(c.agentSockets, id)
|
||||
delete(c.nodes, id)
|
||||
logger.Debug(context.Background(), "deleted agent socket")
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -381,17 +403,20 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error
|
|||
}
|
||||
|
||||
func (c *coordinator) handleNextAgentMessage(id uuid.UUID, decoder *json.Decoder) error {
|
||||
logger := c.logger.With(slog.F("agent_id", id))
|
||||
var node Node
|
||||
err := decoder.Decode(&node)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("read json: %w", err)
|
||||
}
|
||||
logger.Debug(context.Background(), "decoded agent node", slog.F("node", node))
|
||||
|
||||
c.mutex.Lock()
|
||||
c.nodes[id] = &node
|
||||
connectionSockets, ok := c.agentToConnectionSockets[id]
|
||||
if !ok {
|
||||
c.mutex.Unlock()
|
||||
logger.Debug(context.Background(), "no client sockets; unable to send node")
|
||||
return nil
|
||||
}
|
||||
data, err := json.Marshal([]*Node{&node})
|
||||
|
@ -403,11 +428,14 @@ func (c *coordinator) handleNextAgentMessage(id uuid.UUID, decoder *json.Decoder
|
|||
// Publish the new node to every listening socket.
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(connectionSockets))
|
||||
for _, connectionSocket := range connectionSockets {
|
||||
for clientID, connectionSocket := range connectionSockets {
|
||||
clientID := clientID
|
||||
connectionSocket := connectionSocket
|
||||
go func() {
|
||||
_ = connectionSocket.SetWriteDeadline(time.Now().Add(5 * time.Second))
|
||||
_, _ = connectionSocket.Write(data)
|
||||
_, err := connectionSocket.Write(data)
|
||||
logger.Debug(context.Background(), "sent agent node to client",
|
||||
slog.F("client_id", clientID), slog.Error(err))
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
|
|
@ -4,6 +4,9 @@ import (
|
|||
"net"
|
||||
"testing"
|
||||
|
||||
"cdr.dev/slog"
|
||||
"cdr.dev/slog/sloggers/slogtest"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -16,7 +19,8 @@ func TestCoordinator(t *testing.T) {
|
|||
t.Parallel()
|
||||
t.Run("ClientWithoutAgent", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
coordinator := tailnet.NewCoordinator()
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
coordinator := tailnet.NewCoordinator(logger)
|
||||
client, server := net.Pipe()
|
||||
sendNode, errChan := tailnet.ServeCoordinator(client, func(node []*tailnet.Node) error {
|
||||
return nil
|
||||
|
@ -40,7 +44,8 @@ func TestCoordinator(t *testing.T) {
|
|||
|
||||
t.Run("AgentWithoutClients", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
coordinator := tailnet.NewCoordinator()
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
coordinator := tailnet.NewCoordinator(logger)
|
||||
client, server := net.Pipe()
|
||||
sendNode, errChan := tailnet.ServeCoordinator(client, func(node []*tailnet.Node) error {
|
||||
return nil
|
||||
|
@ -64,7 +69,8 @@ func TestCoordinator(t *testing.T) {
|
|||
|
||||
t.Run("AgentWithClient", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
coordinator := tailnet.NewCoordinator()
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
coordinator := tailnet.NewCoordinator(logger)
|
||||
|
||||
agentWS, agentServerWS := net.Pipe()
|
||||
defer agentWS.Close()
|
||||
|
@ -148,7 +154,8 @@ func TestCoordinator(t *testing.T) {
|
|||
|
||||
t.Run("AgentDoubleConnect", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
coordinator := tailnet.NewCoordinator()
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
coordinator := tailnet.NewCoordinator(logger)
|
||||
|
||||
agentWS1, agentServerWS1 := net.Pipe()
|
||||
defer agentWS1.Close()
|
||||
|
|
Loading…
Reference in New Issue