diff --git a/agent/agent.go b/agent/agent.go index 9846a7a314..165c735989 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -24,6 +24,7 @@ import ( "github.com/armon/circbuf" "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" "github.com/spf13/afero" "go.uber.org/atomic" "golang.org/x/exp/slices" @@ -63,6 +64,8 @@ type Options struct { SSHMaxTimeout time.Duration TailnetListenPort uint16 Subsystem codersdk.AgentSubsystem + + PrometheusRegistry *prometheus.Registry } type Client interface { @@ -102,6 +105,12 @@ func New(options Options) Agent { return "", nil } } + + prometheusRegistry := options.PrometheusRegistry + if prometheusRegistry == nil { + prometheusRegistry = prometheus.NewRegistry() + } + ctx, cancelFunc := context.WithCancel(context.Background()) a := &agent{ tailnetListenPort: options.TailnetListenPort, @@ -121,6 +130,9 @@ func New(options Options) Agent { connStatsChan: make(chan *agentsdk.Stats, 1), sshMaxTimeout: options.SSHMaxTimeout, subsystem: options.Subsystem, + + prometheusRegistry: prometheusRegistry, + metrics: newAgentMetrics(prometheusRegistry), } a.init(ctx) return a @@ -165,10 +177,13 @@ type agent struct { latestStat atomic.Pointer[agentsdk.Stats] connCountReconnectingPTY atomic.Int64 + + prometheusRegistry *prometheus.Registry + metrics *agentMetrics } func (a *agent) init(ctx context.Context) { - sshSrv, err := agentssh.NewServer(ctx, a.logger.Named("ssh-server"), a.filesystem, a.sshMaxTimeout, "") + sshSrv, err := agentssh.NewServer(ctx, a.logger.Named("ssh-server"), a.prometheusRegistry, a.filesystem, a.sshMaxTimeout, "") if err != nil { panic(err) } @@ -983,6 +998,7 @@ func (a *agent) trackScriptLogs(ctx context.Context, reader io.Reader) (chan str func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, msg codersdk.WorkspaceAgentReconnectingPTYInit, conn net.Conn) (retErr error) { defer conn.Close() + a.metrics.connectionsTotal.Add(1) a.connCountReconnectingPTY.Add(1) defer a.connCountReconnectingPTY.Add(-1) @@ -1022,6 +1038,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m // Empty command will default to the users shell! cmd, err := a.sshServer.CreateCommand(ctx, msg.Command, nil) if err != nil { + a.metrics.reconnectingPTYErrors.WithLabelValues("create_command").Add(1) return xerrors.Errorf("create command: %w", err) } cmd.Env = append(cmd.Env, "TERM=xterm-256color") @@ -1034,6 +1051,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m ptty, process, err := pty.Start(cmd) if err != nil { + a.metrics.reconnectingPTYErrors.WithLabelValues("start_command").Add(1) return xerrors.Errorf("start command: %w", err) } @@ -1060,7 +1078,12 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m if err != nil { // When the PTY is closed, this is triggered. // Error is typically a benign EOF, so only log for debugging. - logger.Debug(ctx, "unable to read pty output, command exited?", slog.Error(err)) + if errors.Is(err, io.EOF) { + logger.Debug(ctx, "unable to read pty output, command exited?", slog.Error(err)) + } else { + logger.Warn(ctx, "unable to read pty output, command exited?", slog.Error(err)) + a.metrics.reconnectingPTYErrors.WithLabelValues("output_reader").Add(1) + } break } part := buffer[:read] @@ -1075,11 +1098,12 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m for cid, conn := range rpty.activeConns { _, err = conn.Write(part) if err != nil { - logger.Debug(ctx, + logger.Warn(ctx, "error writing to active conn", slog.F("other_conn_id", cid), slog.Error(err), ) + a.metrics.reconnectingPTYErrors.WithLabelValues("write").Add(1) } } rpty.activeConnsMutex.Unlock() @@ -1099,6 +1123,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m if err != nil { // We can continue after this, it's not fatal! logger.Error(ctx, "resize", slog.Error(err)) + a.metrics.reconnectingPTYErrors.WithLabelValues("resize").Add(1) } // Write any previously stored data for the TTY. rpty.circularBufferMutex.RLock() @@ -1111,6 +1136,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m // while also holding circularBufferMutex seems dangerous. _, err = conn.Write(prevBuf) if err != nil { + a.metrics.reconnectingPTYErrors.WithLabelValues("write").Add(1) return xerrors.Errorf("write buffer to conn: %w", err) } // Multiple connections to the same TTY are permitted. @@ -1161,6 +1187,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m _, err = rpty.ptty.InputWriter().Write([]byte(req.Data)) if err != nil { logger.Warn(ctx, "write to pty", slog.Error(err)) + a.metrics.reconnectingPTYErrors.WithLabelValues("input_writer").Add(1) return nil } // Check if a resize needs to happen! @@ -1171,6 +1198,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m if err != nil { // We can continue after this, it's not fatal! logger.Error(ctx, "resize", slog.Error(err)) + a.metrics.reconnectingPTYErrors.WithLabelValues("resize").Add(1) } } } @@ -1203,7 +1231,7 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) { var mu sync.Mutex status := a.network.Status() durations := []float64{} - ctx, cancelFunc := context.WithTimeout(ctx, 5*time.Second) + pingCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second) defer cancelFunc() for nodeID, peer := range status.Peer { if !peer.Active { @@ -1219,7 +1247,7 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) { wg.Add(1) go func() { defer wg.Done() - duration, _, _, err := a.network.Ping(ctx, addresses[0].Addr()) + duration, _, _, err := a.network.Ping(pingCtx, addresses[0].Addr()) if err != nil { return } @@ -1244,7 +1272,10 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) { // Collect agent metrics. // Agent metrics are changing all the time, so there is no need to perform // reflect.DeepEqual to see if stats should be transferred. - stats.Metrics = collectMetrics() + + metricsCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second) + defer cancelFunc() + stats.Metrics = a.collectMetrics(metricsCtx) a.latestStat.Store(stats) diff --git a/agent/agent_test.go b/agent/agent_test.go index 1dab56c2c7..832919e7bf 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -27,6 +27,8 @@ import ( "github.com/google/uuid" "github.com/pion/udp" "github.com/pkg/sftp" + "github.com/prometheus/client_golang/prometheus" + promgo "github.com/prometheus/client_model/go" "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1724,7 +1726,7 @@ func (c closeFunc) Close() error { return c() } -func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Duration) ( +func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Duration, opts ...func(agent.Options) agent.Options) ( *codersdk.WorkspaceAgentConn, *client, <-chan *agentsdk.Stats, @@ -1749,12 +1751,19 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati statsChan: statsCh, coordinator: coordinator, } - closer := agent.New(agent.Options{ + + options := agent.Options{ Client: c, Filesystem: fs, Logger: logger.Named("agent"), ReconnectingPTYTimeout: ptyTimeout, - }) + } + + for _, opt := range opts { + options = opt(options) + } + + closer := agent.New(options) t.Cleanup(func() { _ = closer.Close() }) @@ -1979,3 +1988,110 @@ func tempDirUnixSocket(t *testing.T) string { return t.TempDir() } + +func TestAgent_Metrics_SSH(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) + defer cancel() + + registry := prometheus.NewRegistry() + + //nolint:dogsled + conn, _, _, _, _ := setupAgent(t, agentsdk.Manifest{}, 0, func(o agent.Options) agent.Options { + o.PrometheusRegistry = registry + return o + }) + + sshClient, err := conn.SSHClient(ctx) + require.NoError(t, err) + defer sshClient.Close() + session, err := sshClient.NewSession() + require.NoError(t, err) + defer session.Close() + stdin, err := session.StdinPipe() + require.NoError(t, err) + err = session.Shell() + require.NoError(t, err) + + expected := []agentsdk.AgentMetric{ + { + Name: "agent_reconnecting_pty_connections_total", + Type: agentsdk.AgentMetricTypeCounter, + Value: 0, + }, + { + Name: "agent_sessions_total", + Type: agentsdk.AgentMetricTypeCounter, + Value: 1, + Labels: []agentsdk.AgentMetricLabel{ + { + Name: "magic_type", + Value: "ssh", + }, + { + Name: "pty", + Value: "no", + }, + }, + }, + { + Name: "agent_ssh_server_failed_connections_total", + Type: agentsdk.AgentMetricTypeCounter, + Value: 0, + }, + { + Name: "agent_ssh_server_sftp_connections_total", + Type: agentsdk.AgentMetricTypeCounter, + Value: 0, + }, + { + Name: "agent_ssh_server_sftp_server_errors_total", + Type: agentsdk.AgentMetricTypeCounter, + Value: 0, + }, + } + + var actual []*promgo.MetricFamily + assert.Eventually(t, func() bool { + actual, err = registry.Gather() + if err != nil { + return false + } + + if len(expected) != len(actual) { + return false + } + + return verifyCollectedMetrics(t, expected, actual) + }, testutil.WaitLong, testutil.IntervalFast) + + require.Len(t, actual, len(expected)) + collected := verifyCollectedMetrics(t, expected, actual) + require.True(t, collected, "expected metrics were not collected") + + _ = stdin.Close() + err = session.Wait() + require.NoError(t, err) +} + +func verifyCollectedMetrics(t *testing.T, expected []agentsdk.AgentMetric, actual []*promgo.MetricFamily) bool { + t.Helper() + + for i, e := range expected { + assert.Equal(t, e.Name, actual[i].GetName()) + assert.Equal(t, string(e.Type), strings.ToLower(actual[i].GetType().String())) + + for _, m := range actual[i].GetMetric() { + assert.Equal(t, e.Value, m.Counter.GetValue()) + + if len(m.GetLabel()) > 0 { + for j, lbl := range m.GetLabel() { + assert.Equal(t, e.Labels[j].Name, lbl.GetName()) + assert.Equal(t, e.Labels[j].Value, lbl.GetValue()) + } + } + m.GetLabel() + } + } + return true +} diff --git a/agent/agentssh/agentssh.go b/agent/agentssh/agentssh.go index 6221751ae8..fcf562a888 100644 --- a/agent/agentssh/agentssh.go +++ b/agent/agentssh/agentssh.go @@ -20,6 +20,7 @@ import ( "github.com/gliderlabs/ssh" "github.com/pkg/sftp" + "github.com/prometheus/client_golang/prometheus" "github.com/spf13/afero" "go.uber.org/atomic" gossh "golang.org/x/crypto/ssh" @@ -69,9 +70,11 @@ type Server struct { connCountVSCode atomic.Int64 connCountJetBrains atomic.Int64 connCountSSHSession atomic.Int64 + + metrics *sshServerMetrics } -func NewServer(ctx context.Context, logger slog.Logger, fs afero.Fs, maxTimeout time.Duration, x11SocketDir string) (*Server, error) { +func NewServer(ctx context.Context, logger slog.Logger, prometheusRegistry *prometheus.Registry, fs afero.Fs, maxTimeout time.Duration, x11SocketDir string) (*Server, error) { // Clients' should ignore the host key when connecting. // The agent needs to authenticate with coderd to SSH, // so SSH authentication doesn't improve security. @@ -90,6 +93,7 @@ func NewServer(ctx context.Context, logger slog.Logger, fs afero.Fs, maxTimeout forwardHandler := &ssh.ForwardedTCPHandler{} unixForwardHandler := &forwardedUnixHandler{log: logger} + metrics := newSSHServerMetrics(prometheusRegistry) s := &Server{ listeners: make(map[net.Listener]struct{}), fs: fs, @@ -97,6 +101,8 @@ func NewServer(ctx context.Context, logger slog.Logger, fs afero.Fs, maxTimeout sessions: make(map[ssh.Session]struct{}), logger: logger, x11SocketDir: x11SocketDir, + + metrics: metrics, } s.srv = &ssh.Server{ @@ -106,7 +112,8 @@ func NewServer(ctx context.Context, logger slog.Logger, fs afero.Fs, maxTimeout "session": ssh.DefaultSessionHandler, }, ConnectionFailedCallback: func(_ net.Conn, err error) { - s.logger.Info(ctx, "ssh connection ended", slog.Error(err)) + s.logger.Warn(ctx, "ssh connection failed", slog.Error(err)) + metrics.failedConnectionsTotal.Add(1) }, Handler: s.sessionHandler, HostSigners: []ssh.Signer{randomSigner}, @@ -197,7 +204,7 @@ func (s *Server) sessionHandler(session ssh.Session) { err := s.sessionStart(session, extraEnv) var exitError *exec.ExitError if xerrors.As(err, &exitError) { - s.logger.Debug(ctx, "ssh session returned", slog.Error(exitError)) + s.logger.Warn(ctx, "ssh session returned", slog.Error(exitError)) _ = session.Exit(exitError.ExitCode()) return } @@ -236,14 +243,28 @@ func (s *Server) sessionStart(session ssh.Session, extraEnv []string) (retErr er s.logger.Warn(ctx, "invalid magic ssh session type specified", slog.F("type", magicType)) } + magicTypeLabel := magicTypeMetricLabel(magicType) + sshPty, windowSize, isPty := session.Pty() + cmd, err := s.CreateCommand(ctx, session.RawCommand(), env) if err != nil { + ptyLabel := "no" + if isPty { + ptyLabel = "yes" + } + s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, ptyLabel, "create_command").Add(1) return err } if ssh.AgentRequested(session) { l, err := ssh.NewAgentListener() if err != nil { + ptyLabel := "no" + if isPty { + ptyLabel = "yes" + } + + s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, ptyLabel, "listener").Add(1) return xerrors.Errorf("new agent listener: %w", err) } defer l.Close() @@ -251,28 +272,34 @@ func (s *Server) sessionStart(session ssh.Session, extraEnv []string) (retErr er cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", "SSH_AUTH_SOCK", l.Addr().String())) } - sshPty, windowSize, isPty := session.Pty() if isPty { - return s.startPTYSession(session, cmd, sshPty, windowSize) + return s.startPTYSession(session, magicTypeLabel, cmd, sshPty, windowSize) } - return startNonPTYSession(session, cmd.AsExec()) + return s.startNonPTYSession(session, magicTypeLabel, cmd.AsExec()) } -func startNonPTYSession(session ssh.Session, cmd *exec.Cmd) error { +func (s *Server) startNonPTYSession(session ssh.Session, magicTypeLabel string, cmd *exec.Cmd) error { + s.metrics.sessionsTotal.WithLabelValues(magicTypeLabel, "no").Add(1) + cmd.Stdout = session cmd.Stderr = session.Stderr() // This blocks forever until stdin is received if we don't // use StdinPipe. It's unknown what causes this. stdinPipe, err := cmd.StdinPipe() if err != nil { + s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "no", "stdin_pipe").Add(1) return xerrors.Errorf("create stdin pipe: %w", err) } go func() { - _, _ = io.Copy(stdinPipe, session) + _, err := io.Copy(stdinPipe, session) + if err != nil { + s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "no", "stdin_io_copy").Add(1) + } _ = stdinPipe.Close() }() err = cmd.Start() if err != nil { + s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "no", "start_command").Add(1) return xerrors.Errorf("start: %w", err) } return cmd.Wait() @@ -287,7 +314,9 @@ type ptySession interface { RawCommand() string } -func (s *Server) startPTYSession(session ptySession, cmd *pty.Cmd, sshPty ssh.Pty, windowSize <-chan ssh.Window) (retErr error) { +func (s *Server) startPTYSession(session ptySession, magicTypeLabel string, cmd *pty.Cmd, sshPty ssh.Pty, windowSize <-chan ssh.Window) (retErr error) { + s.metrics.sessionsTotal.WithLabelValues(magicTypeLabel, "yes").Add(1) + ctx := session.Context() // Disable minimal PTY emulation set by gliderlabs/ssh (NL-to-CRNL). // See https://github.com/coder/coder/issues/3371. @@ -299,6 +328,7 @@ func (s *Server) startPTYSession(session ptySession, cmd *pty.Cmd, sshPty ssh.Pt err := showMOTD(session, manifest.MOTDFile) if err != nil { s.logger.Error(ctx, "show MOTD", slog.Error(err)) + s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "yes", "motd").Add(1) } } else { s.logger.Warn(ctx, "metadata lookup failed, unable to show MOTD") @@ -313,12 +343,14 @@ func (s *Server) startPTYSession(session ptySession, cmd *pty.Cmd, sshPty ssh.Pt pty.WithLogger(slog.Stdlib(ctx, s.logger, slog.LevelInfo)), )) if err != nil { + s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "yes", "start_command").Add(1) return xerrors.Errorf("start command: %w", err) } defer func() { closeErr := ptty.Close() if closeErr != nil { s.logger.Warn(ctx, "failed to close tty", slog.Error(closeErr)) + s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "yes", "close").Add(1) if retErr == nil { retErr = closeErr } @@ -330,12 +362,16 @@ func (s *Server) startPTYSession(session ptySession, cmd *pty.Cmd, sshPty ssh.Pt // If the pty is closed, then command has exited, no need to log. if resizeErr != nil && !errors.Is(resizeErr, pty.ErrClosed) { s.logger.Warn(ctx, "failed to resize tty", slog.Error(resizeErr)) + s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "yes", "resize").Add(1) } } }() go func() { - _, _ = io.Copy(ptty.InputWriter(), session) + _, err := io.Copy(ptty.InputWriter(), session) + if err != nil { + s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "yes", "input_io_copy").Add(1) + } }() // We need to wait for the command output to finish copying. It's safe to @@ -349,6 +385,7 @@ func (s *Server) startPTYSession(session ptySession, cmd *pty.Cmd, sshPty ssh.Pt n, err := io.Copy(session, ptty.OutputReader()) s.logger.Debug(ctx, "copy output done", slog.F("bytes", n), slog.Error(err)) if err != nil { + s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "yes", "output_io_copy").Add(1) return xerrors.Errorf("copy error: %w", err) } // We've gotten all the output, but we need to wait for the process to @@ -360,6 +397,7 @@ func (s *Server) startPTYSession(session ptySession, cmd *pty.Cmd, sshPty ssh.Pt // and not something to be concerned about. But, if it's something else, we should log it. if err != nil && !xerrors.As(err, &exitErr) { s.logger.Warn(ctx, "wait error", slog.Error(err)) + s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "yes", "wait").Add(1) } if err != nil { return xerrors.Errorf("process wait: %w", err) @@ -368,6 +406,8 @@ func (s *Server) startPTYSession(session ptySession, cmd *pty.Cmd, sshPty ssh.Pt } func (s *Server) sftpHandler(session ssh.Session) { + s.metrics.sftpConnectionsTotal.Add(1) + ctx := session.Context() // Typically sftp sessions don't request a TTY, but if they do, @@ -407,6 +447,7 @@ func (s *Server) sftpHandler(session ssh.Session) { return } s.logger.Warn(ctx, "sftp server closed with error", slog.Error(err)) + s.metrics.sftpServerErrors.Add(1) _ = session.Exit(1) } diff --git a/agent/agentssh/agentssh_internal_test.go b/agent/agentssh/agentssh_internal_test.go index ed05e53a04..8cee9b7a86 100644 --- a/agent/agentssh/agentssh_internal_test.go +++ b/agent/agentssh/agentssh_internal_test.go @@ -10,6 +10,7 @@ import ( "testing" gliderssh "github.com/gliderlabs/ssh" + "github.com/prometheus/client_golang/prometheus" "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -36,7 +37,7 @@ func Test_sessionStart_orphan(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitMedium) defer cancel() logger := slogtest.Make(t, nil) - s, err := NewServer(ctx, logger, afero.NewMemMapFs(), 0, "") + s, err := NewServer(ctx, logger, prometheus.NewRegistry(), afero.NewMemMapFs(), 0, "") require.NoError(t, err) // Here we're going to call the handler directly with a faked SSH session @@ -57,10 +58,11 @@ func Test_sessionStart_orphan(t *testing.T) { done := make(chan struct{}) go func() { defer close(done) + // we don't really care what the error is here. In the larger scenario, // the client has disconnected, so we can't return any error information // to them. - _ = s.startPTYSession(sess, cmd, ptyInfo, windowSize) + _ = s.startPTYSession(sess, "ssh", cmd, ptyInfo, windowSize) }() readDone := make(chan struct{}) diff --git a/agent/agentssh/agentssh_test.go b/agent/agentssh/agentssh_test.go index b1675f0029..d2cb56d44d 100644 --- a/agent/agentssh/agentssh_test.go +++ b/agent/agentssh/agentssh_test.go @@ -10,6 +10,7 @@ import ( "sync" "testing" + "github.com/prometheus/client_golang/prometheus" "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -33,7 +34,7 @@ func TestNewServer_ServeClient(t *testing.T) { ctx := context.Background() logger := slogtest.Make(t, nil) - s, err := agentssh.NewServer(ctx, logger, afero.NewMemMapFs(), 0, "") + s, err := agentssh.NewServer(ctx, logger, prometheus.NewRegistry(), afero.NewMemMapFs(), 0, "") require.NoError(t, err) // The assumption is that these are set before serving SSH connections. @@ -74,7 +75,7 @@ func TestNewServer_CloseActiveConnections(t *testing.T) { ctx := context.Background() logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}) - s, err := agentssh.NewServer(ctx, logger, afero.NewMemMapFs(), 0, "") + s, err := agentssh.NewServer(ctx, logger, prometheus.NewRegistry(), afero.NewMemMapFs(), 0, "") require.NoError(t, err) // The assumption is that these are set before serving SSH connections. diff --git a/agent/agentssh/metrics.go b/agent/agentssh/metrics.go new file mode 100644 index 0000000000..88ee100d65 --- /dev/null +++ b/agent/agentssh/metrics.go @@ -0,0 +1,82 @@ +package agentssh + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +type sshServerMetrics struct { + failedConnectionsTotal prometheus.Counter + sftpConnectionsTotal prometheus.Counter + sftpServerErrors prometheus.Counter + x11HandlerErrors *prometheus.CounterVec + sessionsTotal *prometheus.CounterVec + sessionErrors *prometheus.CounterVec +} + +func newSSHServerMetrics(registerer prometheus.Registerer) *sshServerMetrics { + failedConnectionsTotal := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "agent", Subsystem: "ssh_server", Name: "failed_connections_total", + }) + registerer.MustRegister(failedConnectionsTotal) + + sftpConnectionsTotal := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "agent", Subsystem: "ssh_server", Name: "sftp_connections_total", + }) + registerer.MustRegister(sftpConnectionsTotal) + + sftpServerErrors := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "agent", Subsystem: "ssh_server", Name: "sftp_server_errors_total", + }) + registerer.MustRegister(sftpServerErrors) + + x11HandlerErrors := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "agent", + Subsystem: "x11_handler", + Name: "errors_total", + }, + []string{"error_type"}, + ) + registerer.MustRegister(x11HandlerErrors) + + sessionsTotal := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "agent", + Subsystem: "sessions", + Name: "total", + }, + []string{"magic_type", "pty"}, + ) + registerer.MustRegister(sessionsTotal) + + sessionErrors := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "agent", + Subsystem: "sessions", + Name: "errors_total", + }, + []string{"magic_type", "pty", "error_type"}, + ) + registerer.MustRegister(sessionErrors) + + return &sshServerMetrics{ + failedConnectionsTotal: failedConnectionsTotal, + sftpConnectionsTotal: sftpConnectionsTotal, + sftpServerErrors: sftpServerErrors, + x11HandlerErrors: x11HandlerErrors, + sessionsTotal: sessionsTotal, + sessionErrors: sessionErrors, + } +} + +func magicTypeMetricLabel(magicType string) string { + switch magicType { + case MagicSessionTypeVSCode: + case MagicSessionTypeJetBrains: + case "": + magicType = "ssh" + default: + magicType = "unknown" + } + return magicType +} diff --git a/agent/agentssh/x11.go b/agent/agentssh/x11.go index b301326a0a..00c2819cc0 100644 --- a/agent/agentssh/x11.go +++ b/agent/agentssh/x11.go @@ -27,18 +27,21 @@ func (s *Server) x11Callback(ctx ssh.Context, x11 ssh.X11) bool { hostname, err := os.Hostname() if err != nil { s.logger.Warn(ctx, "failed to get hostname", slog.Error(err)) + s.metrics.x11HandlerErrors.WithLabelValues("hostname").Add(1) return false } err = s.fs.MkdirAll(s.x11SocketDir, 0o700) if err != nil { s.logger.Warn(ctx, "failed to make the x11 socket dir", slog.F("dir", s.x11SocketDir), slog.Error(err)) + s.metrics.x11HandlerErrors.WithLabelValues("socker_dir").Add(1) return false } err = addXauthEntry(ctx, s.fs, hostname, strconv.Itoa(int(x11.ScreenNumber)), x11.AuthProtocol, x11.AuthCookie) if err != nil { s.logger.Warn(ctx, "failed to add Xauthority entry", slog.Error(err)) + s.metrics.x11HandlerErrors.WithLabelValues("xauthority").Add(1) return false } return true diff --git a/agent/agentssh/x11_test.go b/agent/agentssh/x11_test.go index cd935d3268..1fce885bab 100644 --- a/agent/agentssh/x11_test.go +++ b/agent/agentssh/x11_test.go @@ -10,6 +10,7 @@ import ( "testing" "github.com/gliderlabs/ssh" + "github.com/prometheus/client_golang/prometheus" "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -33,7 +34,7 @@ func TestServer_X11(t *testing.T) { logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) fs := afero.NewOsFs() dir := t.TempDir() - s, err := agentssh.NewServer(ctx, logger, fs, 0, dir) + s, err := agentssh.NewServer(ctx, logger, prometheus.NewRegistry(), fs, 0, dir) require.NoError(t, err) defer s.Close() diff --git a/agent/metrics.go b/agent/metrics.go index fd195202c0..dc5fb6c018 100644 --- a/agent/metrics.go +++ b/agent/metrics.go @@ -1,18 +1,51 @@ package agent import ( + "context" "fmt" "strings" + "github.com/prometheus/client_golang/prometheus" + prompb "github.com/prometheus/client_model/go" "tailscale.com/util/clientmetric" + "cdr.dev/slog" + "github.com/coder/coder/codersdk/agentsdk" ) -func collectMetrics() []agentsdk.AgentMetric { - // Tailscale metrics +type agentMetrics struct { + connectionsTotal prometheus.Counter + reconnectingPTYErrors *prometheus.CounterVec +} + +func newAgentMetrics(registerer prometheus.Registerer) *agentMetrics { + connectionsTotal := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "agent", Subsystem: "reconnecting_pty", Name: "connections_total", + }) + registerer.MustRegister(connectionsTotal) + + reconnectingPTYErrors := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "agent", + Subsystem: "reconnecting_pty", + Name: "errors_total", + }, + []string{"error_type"}, + ) + registerer.MustRegister(reconnectingPTYErrors) + + return &agentMetrics{ + connectionsTotal: connectionsTotal, + reconnectingPTYErrors: reconnectingPTYErrors, + } +} + +func (a *agent) collectMetrics(ctx context.Context) []agentsdk.AgentMetric { + var collected []agentsdk.AgentMetric + + // Tailscale internal metrics metrics := clientmetric.Metrics() - collected := make([]agentsdk.AgentMetric, 0, len(metrics)) for _, m := range metrics { if isIgnoredMetric(m.Name()) { continue @@ -24,9 +57,54 @@ func collectMetrics() []agentsdk.AgentMetric { Value: float64(m.Value()), }) } + + metricFamilies, err := a.prometheusRegistry.Gather() + if err != nil { + a.logger.Error(ctx, "can't gather agent metrics", slog.Error(err)) + return collected + } + + for _, metricFamily := range metricFamilies { + for _, metric := range metricFamily.GetMetric() { + labels := toAgentMetricLabels(metric.Label) + + if metric.Counter != nil { + collected = append(collected, agentsdk.AgentMetric{ + Name: metricFamily.GetName(), + Type: agentsdk.AgentMetricTypeCounter, + Value: metric.Counter.GetValue(), + Labels: labels, + }) + } else if metric.Gauge != nil { + collected = append(collected, agentsdk.AgentMetric{ + Name: metricFamily.GetName(), + Type: agentsdk.AgentMetricTypeGauge, + Value: metric.Gauge.GetValue(), + Labels: labels, + }) + } else { + a.logger.Error(ctx, "unsupported metric type", slog.F("type", metricFamily.Type.String())) + } + } + } return collected } +func toAgentMetricLabels(metricLabels []*prompb.LabelPair) []agentsdk.AgentMetricLabel { + if len(metricLabels) == 0 { + return nil + } + + labels := make([]agentsdk.AgentMetricLabel, 0, len(metricLabels)) + for _, metricLabel := range metricLabels { + labels = append(labels, agentsdk.AgentMetricLabel{ + Name: metricLabel.GetName(), + Value: metricLabel.GetValue(), + }) + } + return labels +} + // isIgnoredMetric checks if the metric should be ignored, as Coder agent doesn't use related features. // Expected metric families: magicsock_*, derp_*, tstun_*, netcheck_*, portmap_*, etc. func isIgnoredMetric(metricName string) bool { diff --git a/cli/agent.go b/cli/agent.go index cb127ce292..aae59dc4e5 100644 --- a/cli/agent.go +++ b/cli/agent.go @@ -20,6 +20,9 @@ import ( "gopkg.in/natefinch/lumberjack.v2" "tailscale.com/util/clientmetric" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/expfmt" + "cdr.dev/slog" "cdr.dev/slog/sloggers/sloghuman" "cdr.dev/slog/sloggers/slogjson" @@ -173,8 +176,6 @@ func (r *RootCmd) workspaceAgent() *clibase.Cmd { ignorePorts[port] = "pprof" } - prometheusSrvClose := ServeHandler(ctx, logger, prometheusMetricsHandler(), prometheusAddress, "prometheus") - defer prometheusSrvClose() if port, err := extractPort(prometheusAddress); err == nil { ignorePorts[port] = "prometheus" } @@ -244,6 +245,7 @@ func (r *RootCmd) workspaceAgent() *clibase.Cmd { return xerrors.Errorf("add executable to $PATH: %w", err) } + prometheusRegistry := prometheus.NewRegistry() subsystem := inv.Environ.Get(agent.EnvAgentSubsystem) agnt := agent.New(agent.Options{ Client: client, @@ -267,8 +269,13 @@ func (r *RootCmd) workspaceAgent() *clibase.Cmd { IgnorePorts: ignorePorts, SSHMaxTimeout: sshMaxTimeout, Subsystem: codersdk.AgentSubsystem(subsystem), + + PrometheusRegistry: prometheusRegistry, }) + prometheusSrvClose := ServeHandler(ctx, logger, prometheusMetricsHandler(prometheusRegistry, logger), prometheusAddress, "prometheus") + defer prometheusSrvClose() + debugSrvClose := ServeHandler(ctx, logger, agnt.HTTPDebug(), debugAddress, "debug") defer debugSrvClose() @@ -445,11 +452,25 @@ func urlPort(u string) (int, error) { return -1, xerrors.Errorf("invalid port: %s", u) } -func prometheusMetricsHandler() http.Handler { - // We don't have any other internal metrics so far, so it's safe to expose metrics this way. - // Based on: https://github.com/tailscale/tailscale/blob/280255acae604796a1113861f5a84e6fa2dc6121/ipn/localapi/localapi.go#L489 +func prometheusMetricsHandler(prometheusRegistry *prometheus.Registry, logger slog.Logger) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain") + + // Based on: https://github.com/tailscale/tailscale/blob/280255acae604796a1113861f5a84e6fa2dc6121/ipn/localapi/localapi.go#L489 clientmetric.WritePrometheusExpositionFormat(w) + + metricFamilies, err := prometheusRegistry.Gather() + if err != nil { + logger.Error(context.Background(), "Prometheus handler can't gather metric families", slog.Error(err)) + return + } + + for _, metricFamily := range metricFamilies { + _, err = expfmt.MetricFamilyToText(w, metricFamily) + if err != nil { + logger.Error(context.Background(), "expfmt.MetricFamilyToText failed", slog.Error(err)) + return + } + } }) } diff --git a/coderd/apidoc/docs.go b/coderd/apidoc/docs.go index bb2f2b6026..81675530aa 100644 --- a/coderd/apidoc/docs.go +++ b/coderd/apidoc/docs.go @@ -5770,6 +5770,12 @@ const docTemplate = `{ "value" ], "properties": { + "labels": { + "type": "array", + "items": { + "$ref": "#/definitions/agentsdk.AgentMetricLabel" + } + }, "name": { "type": "string" }, @@ -5789,6 +5795,21 @@ const docTemplate = `{ } } }, + "agentsdk.AgentMetricLabel": { + "type": "object", + "required": [ + "name", + "value" + ], + "properties": { + "name": { + "type": "string" + }, + "value": { + "type": "string" + } + } + }, "agentsdk.AgentMetricType": { "type": "string", "enum": [ diff --git a/coderd/apidoc/swagger.json b/coderd/apidoc/swagger.json index ba4da7aa72..2c9080fe49 100644 --- a/coderd/apidoc/swagger.json +++ b/coderd/apidoc/swagger.json @@ -5076,6 +5076,12 @@ "type": "object", "required": ["name", "type", "value"], "properties": { + "labels": { + "type": "array", + "items": { + "$ref": "#/definitions/agentsdk.AgentMetricLabel" + } + }, "name": { "type": "string" }, @@ -5092,6 +5098,18 @@ } } }, + "agentsdk.AgentMetricLabel": { + "type": "object", + "required": ["name", "value"], + "properties": { + "name": { + "type": "string" + }, + "value": { + "type": "string" + } + } + }, "agentsdk.AgentMetricType": { "type": "string", "enum": ["counter", "gauge"], diff --git a/coderd/prometheusmetrics/aggregator.go b/coderd/prometheusmetrics/aggregator.go index ba3d520468..b236bac5f6 100644 --- a/coderd/prometheusmetrics/aggregator.go +++ b/coderd/prometheusmetrics/aggregator.go @@ -5,6 +5,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/exp/slices" "golang.org/x/xerrors" "cdr.dev/slog" @@ -62,6 +63,30 @@ type annotatedMetric struct { var _ prometheus.Collector = new(MetricsAggregator) +func (am *annotatedMetric) is(req updateRequest, m agentsdk.AgentMetric) bool { + return am.username == req.username && am.workspaceName == req.workspaceName && am.agentName == req.agentName && am.Name == m.Name && slices.Equal(am.Labels, m.Labels) +} + +func (am *annotatedMetric) asPrometheus() (prometheus.Metric, error) { + labels := make([]string, 0, len(agentMetricsLabels)+len(am.Labels)) + labelValues := make([]string, 0, len(agentMetricsLabels)+len(am.Labels)) + + labels = append(labels, agentMetricsLabels...) + labelValues = append(labelValues, am.username, am.workspaceName, am.agentName) + + for _, l := range am.Labels { + labels = append(labels, l.Name) + labelValues = append(labelValues, l.Value) + } + + desc := prometheus.NewDesc(am.Name, metricHelpForAgent, labels, nil) + valueType, err := asPrometheusValueType(am.Type) + if err != nil { + return nil, err + } + return prometheus.MustNewConstMetric(desc, valueType, am.Value, labelValues...), nil +} + func NewMetricsAggregator(logger slog.Logger, registerer prometheus.Registerer, duration time.Duration) (*MetricsAggregator, error) { metricsCleanupInterval := defaultMetricsCleanupInterval if duration > 0 { @@ -122,7 +147,7 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() { UpdateLoop: for _, m := range req.metrics { for i, q := range ma.queue { - if q.username == req.username && q.workspaceName == req.workspaceName && q.agentName == req.agentName && q.Name == m.Name { + if q.is(req, m) { ma.queue[i].AgentMetric.Value = m.Value ma.queue[i].expiryDate = req.timestamp.Add(ma.metricsCleanupInterval) continue UpdateLoop @@ -146,14 +171,12 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() { output := make([]prometheus.Metric, 0, len(ma.queue)) for _, m := range ma.queue { - desc := prometheus.NewDesc(m.Name, metricHelpForAgent, agentMetricsLabels, nil) - valueType, err := asPrometheusValueType(m.Type) + promMetric, err := m.asPrometheus() if err != nil { ma.log.Error(ctx, "can't convert Prometheus value type", slog.F("name", m.Name), slog.F("type", m.Type), slog.F("value", m.Value), slog.Error(err)) continue } - constMetric := prometheus.MustNewConstMetric(desc, valueType, m.Value, m.username, m.workspaceName, m.agentName) - output = append(output, constMetric) + output = append(output, promMetric) } outputCh <- output close(outputCh) diff --git a/coderd/prometheusmetrics/aggregator_test.go b/coderd/prometheusmetrics/aggregator_test.go index 68b5f94e46..d7a15a547e 100644 --- a/coderd/prometheusmetrics/aggregator_test.go +++ b/coderd/prometheusmetrics/aggregator_test.go @@ -44,14 +44,31 @@ func TestUpdateMetrics_MetricsDoNotExpire(t *testing.T) { given2 := []agentsdk.AgentMetric{ {Name: "b_counter_two", Type: agentsdk.AgentMetricTypeCounter, Value: 4}, + {Name: "c_gauge_three", Type: agentsdk.AgentMetricTypeGauge, Value: 5}, + {Name: "c_gauge_three", Type: agentsdk.AgentMetricTypeGauge, Value: 2, Labels: []agentsdk.AgentMetricLabel{ + {Name: "foobar", Value: "Foobaz"}, + {Name: "hello", Value: "world"}, + }}, {Name: "d_gauge_four", Type: agentsdk.AgentMetricTypeGauge, Value: 6}, } + commonLabels := []agentsdk.AgentMetricLabel{ + {Name: "agent_name", Value: testAgentName}, + {Name: "username", Value: testUsername}, + {Name: "workspace_name", Value: testWorkspaceName}, + } expected := []agentsdk.AgentMetric{ - {Name: "a_counter_one", Type: agentsdk.AgentMetricTypeCounter, Value: 1}, - {Name: "b_counter_two", Type: agentsdk.AgentMetricTypeCounter, Value: 4}, - {Name: "c_gauge_three", Type: agentsdk.AgentMetricTypeGauge, Value: 3}, - {Name: "d_gauge_four", Type: agentsdk.AgentMetricTypeGauge, Value: 6}, + {Name: "a_counter_one", Type: agentsdk.AgentMetricTypeCounter, Value: 1, Labels: commonLabels}, + {Name: "b_counter_two", Type: agentsdk.AgentMetricTypeCounter, Value: 4, Labels: commonLabels}, + {Name: "c_gauge_three", Type: agentsdk.AgentMetricTypeGauge, Value: 5, Labels: commonLabels}, + {Name: "c_gauge_three", Type: agentsdk.AgentMetricTypeGauge, Value: 2, Labels: []agentsdk.AgentMetricLabel{ + {Name: "agent_name", Value: testAgentName}, + {Name: "foobar", Value: "Foobaz"}, + {Name: "hello", Value: "world"}, + {Name: "username", Value: testUsername}, + {Name: "workspace_name", Value: testWorkspaceName}, + }}, + {Name: "d_gauge_four", Type: agentsdk.AgentMetricTypeGauge, Value: 6, Labels: commonLabels}, } // when @@ -83,7 +100,6 @@ func verifyCollectedMetrics(t *testing.T, expected []agentsdk.AgentMetric, actua return false } - // Metrics are expected to arrive in order for i, e := range expected { desc := actual[i].Desc() assert.Contains(t, desc.String(), e.Name) @@ -92,24 +108,31 @@ func verifyCollectedMetrics(t *testing.T, expected []agentsdk.AgentMetric, actua err := actual[i].Write(&d) require.NoError(t, err) - require.Equal(t, "agent_name", *d.Label[0].Name) - require.Equal(t, testAgentName, *d.Label[0].Value) - require.Equal(t, "username", *d.Label[1].Name) - require.Equal(t, testUsername, *d.Label[1].Value) - require.Equal(t, "workspace_name", *d.Label[2].Name) - require.Equal(t, testWorkspaceName, *d.Label[2].Value) - if e.Type == agentsdk.AgentMetricTypeCounter { - require.Equal(t, e.Value, *d.Counter.Value) + require.Equal(t, e.Value, d.Counter.GetValue()) } else if e.Type == agentsdk.AgentMetricTypeGauge { - require.Equal(t, e.Value, *d.Gauge.Value) + require.Equal(t, e.Value, d.Gauge.GetValue()) } else { require.Failf(t, "unsupported type: %s", string(e.Type)) } + + dtoLabels := asMetricAgentLabels(d.GetLabel()) + require.Equal(t, e.Labels, dtoLabels, d.String()) } return true } +func asMetricAgentLabels(dtoLabels []*dto.LabelPair) []agentsdk.AgentMetricLabel { + metricLabels := make([]agentsdk.AgentMetricLabel, 0, len(dtoLabels)) + for _, dtoLabel := range dtoLabels { + metricLabels = append(metricLabels, agentsdk.AgentMetricLabel{ + Name: dtoLabel.GetName(), + Value: dtoLabel.GetValue(), + }) + } + return metricLabels +} + func TestUpdateMetrics_MetricsExpire(t *testing.T) { t.Parallel() diff --git a/codersdk/agentsdk/agentsdk.go b/codersdk/agentsdk/agentsdk.go index 0a5d30c0f1..a14dbb54cc 100644 --- a/codersdk/agentsdk/agentsdk.go +++ b/codersdk/agentsdk/agentsdk.go @@ -496,9 +496,15 @@ const ( ) type AgentMetric struct { - Name string `json:"name" validate:"required"` - Type AgentMetricType `json:"type" validate:"required" enums:"counter,gauge"` - Value float64 `json:"value" validate:"required"` + Name string `json:"name" validate:"required"` + Type AgentMetricType `json:"type" validate:"required" enums:"counter,gauge"` + Value float64 `json:"value" validate:"required"` + Labels []AgentMetricLabel `json:"labels,omitempty"` +} + +type AgentMetricLabel struct { + Name string `json:"name" validate:"required"` + Value string `json:"value" validate:"required"` } type StatsResponse struct { diff --git a/docs/api/schemas.md b/docs/api/schemas.md index e428c8bc2f..b613608e55 100644 --- a/docs/api/schemas.md +++ b/docs/api/schemas.md @@ -20,6 +20,12 @@ ```json { + "labels": [ + { + "name": "string", + "value": "string" + } + ], "name": "string", "type": "counter", "value": 0 @@ -28,11 +34,12 @@ ### Properties -| Name | Type | Required | Restrictions | Description | -| ------- | ---------------------------------------------------- | -------- | ------------ | ----------- | -| `name` | string | true | | | -| `type` | [agentsdk.AgentMetricType](#agentsdkagentmetrictype) | true | | | -| `value` | number | true | | | +| Name | Type | Required | Restrictions | Description | +| -------- | --------------------------------------------------------------- | -------- | ------------ | ----------- | +| `labels` | array of [agentsdk.AgentMetricLabel](#agentsdkagentmetriclabel) | false | | | +| `name` | string | true | | | +| `type` | [agentsdk.AgentMetricType](#agentsdkagentmetrictype) | true | | | +| `value` | number | true | | | #### Enumerated Values @@ -41,6 +48,22 @@ | `type` | `counter` | | `type` | `gauge` | +## agentsdk.AgentMetricLabel + +```json +{ + "name": "string", + "value": "string" +} +``` + +### Properties + +| Name | Type | Required | Restrictions | Description | +| ------- | ------ | -------- | ------------ | ----------- | +| `name` | string | true | | | +| `value` | string | true | | | + ## agentsdk.AgentMetricType ```json @@ -370,6 +393,12 @@ }, "metrics": [ { + "labels": [ + { + "name": "string", + "value": "string" + } + ], "name": "string", "type": "counter", "value": 0