feat: add agent log streaming and follow provisioner format (#8170)

This commit is contained in:
Mathias Fredriksson 2023-06-28 11:54:13 +03:00 committed by GitHub
parent c0a01ec81c
commit d3c39b60c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 722 additions and 618 deletions

View File

@ -2,31 +2,21 @@ package cliui
import (
"context"
"fmt"
"io"
"os"
"os/signal"
"sync"
"time"
"github.com/briandowns/spinner"
"github.com/muesli/reflow/indent"
"github.com/muesli/reflow/wordwrap"
"github.com/google/uuid"
"golang.org/x/xerrors"
"github.com/coder/coder/codersdk"
)
var (
AgentStartError = xerrors.New("agent startup exited with non-zero exit status")
AgentShuttingDown = xerrors.New("agent is shutting down")
)
var errAgentShuttingDown = xerrors.New("agent is shutting down")
type AgentOptions struct {
WorkspaceName string
Fetch func(context.Context) (codersdk.WorkspaceAgent, error)
FetchInterval time.Duration
WarnInterval time.Duration
Fetch func(context.Context) (codersdk.WorkspaceAgent, error)
FetchLogs func(ctx context.Context, agentID uuid.UUID, after int64, follow bool) (<-chan []codersdk.WorkspaceAgentStartupLog, io.Closer, error)
Wait bool // If true, wait for the agent to be ready (startup script).
}
@ -35,230 +25,205 @@ func Agent(ctx context.Context, writer io.Writer, opts AgentOptions) error {
if opts.FetchInterval == 0 {
opts.FetchInterval = 500 * time.Millisecond
}
if opts.WarnInterval == 0 {
opts.WarnInterval = 30 * time.Second
if opts.FetchLogs == nil {
opts.FetchLogs = func(_ context.Context, _ uuid.UUID, _ int64, _ bool) (<-chan []codersdk.WorkspaceAgentStartupLog, io.Closer, error) {
c := make(chan []codersdk.WorkspaceAgentStartupLog)
close(c)
return c, closeFunc(func() error { return nil }), nil
}
}
var resourceMutex sync.Mutex
agent, err := opts.Fetch(ctx)
type fetchAgent struct {
agent codersdk.WorkspaceAgent
err error
}
fetchedAgent := make(chan fetchAgent, 1)
go func() {
t := time.NewTimer(0)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
agent, err := opts.Fetch(ctx)
select {
case <-fetchedAgent:
default:
}
if err != nil {
fetchedAgent <- fetchAgent{err: xerrors.Errorf("fetch workspace agent: %w", err)}
return
}
fetchedAgent <- fetchAgent{agent: agent}
t.Reset(opts.FetchInterval)
}
}
}()
fetch := func() (codersdk.WorkspaceAgent, error) {
select {
case <-ctx.Done():
return codersdk.WorkspaceAgent{}, ctx.Err()
case f := <-fetchedAgent:
if f.err != nil {
return codersdk.WorkspaceAgent{}, f.err
}
return f.agent, nil
}
}
agent, err := fetch()
if err != nil {
return xerrors.Errorf("fetch: %w", err)
}
// Fast path if the agent is ready (avoid showing connecting prompt).
// We don't take the fast path for opts.NoWait yet because we want to
// show the message.
if agent.Status == codersdk.WorkspaceAgentConnected &&
(agent.StartupScriptBehavior == codersdk.WorkspaceAgentStartupScriptBehaviorNonBlocking || agent.LifecycleState == codersdk.WorkspaceAgentLifecycleReady) {
return nil
}
sw := &stageWriter{w: writer}
ctx, cancel := signal.NotifyContext(ctx, os.Interrupt)
defer cancel()
spin := spinner.New(spinner.CharSets[78], 100*time.Millisecond, spinner.WithColor("fgHiGreen"))
spin.Writer = writer
spin.ForceOutput = true
spin.Suffix = waitingMessage(agent, opts).Spin
waitMessage := &message{}
showMessage := func() {
resourceMutex.Lock()
defer resourceMutex.Unlock()
m := waitingMessage(agent, opts)
if m.Prompt == waitMessage.Prompt {
return
}
moveUp := ""
if waitMessage.Prompt != "" {
// If this is an update, move a line up
// to keep it tidy and aligned.
moveUp = "\033[1A"
}
waitMessage = m
// Stop the spinner while we write our message.
spin.Stop()
spin.Suffix = waitMessage.Spin
// Clear the line and (if necessary) move up a line to write our message.
_, _ = fmt.Fprintf(writer, "\033[2K%s\n%s\n", moveUp, waitMessage.Prompt)
select {
case <-ctx.Done():
default:
// Safe to resume operation.
if spin.Suffix != "" {
spin.Start()
}
}
}
// Fast path for showing the error message even when using no wait,
// we do this just before starting the spinner to avoid needless
// spinning.
if agent.Status == codersdk.WorkspaceAgentConnected &&
agent.StartupScriptBehavior == codersdk.WorkspaceAgentStartupScriptBehaviorBlocking && !opts.Wait {
showMessage()
return nil
}
// Start spinning after fast paths are handled.
if spin.Suffix != "" {
spin.Start()
}
defer spin.Stop()
warnAfter := time.NewTimer(opts.WarnInterval)
defer warnAfter.Stop()
warningShown := make(chan struct{})
go func() {
select {
case <-ctx.Done():
close(warningShown)
case <-warnAfter.C:
close(warningShown)
showMessage()
}
}()
fetchInterval := time.NewTicker(opts.FetchInterval)
defer fetchInterval.Stop()
showStartupLogs := false
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-fetchInterval.C:
// It doesn't matter if we're connected or not, if the agent is
// shutting down, we don't know if it's coming back.
if agent.LifecycleState.ShuttingDown() {
return errAgentShuttingDown
}
resourceMutex.Lock()
agent, err = opts.Fetch(ctx)
if err != nil {
resourceMutex.Unlock()
return xerrors.Errorf("fetch: %w", err)
}
resourceMutex.Unlock()
switch agent.Status {
case codersdk.WorkspaceAgentConnected:
// NOTE(mafredri): Once we have access to the workspace agent's
// startup script logs, we can show them here.
// https://github.com/coder/coder/issues/2957
if agent.StartupScriptBehavior == codersdk.WorkspaceAgentStartupScriptBehaviorBlocking && opts.Wait {
switch agent.LifecycleState {
case codersdk.WorkspaceAgentLifecycleReady:
return nil
case codersdk.WorkspaceAgentLifecycleStartTimeout:
showMessage()
case codersdk.WorkspaceAgentLifecycleStartError:
showMessage()
return AgentStartError
case codersdk.WorkspaceAgentLifecycleShuttingDown, codersdk.WorkspaceAgentLifecycleShutdownTimeout,
codersdk.WorkspaceAgentLifecycleShutdownError, codersdk.WorkspaceAgentLifecycleOff:
showMessage()
return AgentShuttingDown
default:
select {
case <-warningShown:
showMessage()
default:
// This state is normal, we don't want
// to show a message prematurely.
case codersdk.WorkspaceAgentConnecting, codersdk.WorkspaceAgentTimeout:
// Since we were waiting for the agent to connect, also show
// startup logs if applicable.
showStartupLogs = true
stage := "Waiting for the workspace agent to connect"
sw.Start(stage)
for agent.Status == codersdk.WorkspaceAgentConnecting {
if agent, err = fetch(); err != nil {
return xerrors.Errorf("fetch: %w", err)
}
}
if agent.Status == codersdk.WorkspaceAgentTimeout {
now := time.Now()
sw.Log(now, codersdk.LogLevelInfo, "The workspace agent is having trouble connecting, wait for it to connect or restart your workspace.")
sw.Log(now, codersdk.LogLevelInfo, troubleshootingMessage(agent, "https://coder.com/docs/v2/latest/templates#agent-connection-issues"))
for agent.Status == codersdk.WorkspaceAgentTimeout {
if agent, err = fetch(); err != nil {
return xerrors.Errorf("fetch: %w", err)
}
}
continue
}
return nil
case codersdk.WorkspaceAgentTimeout, codersdk.WorkspaceAgentDisconnected:
showMessage()
}
}
}
sw.Complete(stage, agent.FirstConnectedAt.Sub(agent.CreatedAt))
type message struct {
Spin string
Prompt string
Troubleshoot bool
}
func waitingMessage(agent codersdk.WorkspaceAgent, opts AgentOptions) (m *message) {
m = &message{
Spin: fmt.Sprintf("Waiting for connection from %s...", DefaultStyles.Field.Render(agent.Name)),
Prompt: "Don't panic, your workspace is booting up!",
}
defer func() {
if agent.Status == codersdk.WorkspaceAgentConnected && !opts.Wait {
m.Spin = ""
}
if m.Spin != "" {
m.Spin = " " + m.Spin
}
// We don't want to wrap the troubleshooting URL, so we'll handle word
// wrapping ourselves (vs using lipgloss).
w := wordwrap.NewWriter(DefaultStyles.Paragraph.GetWidth() - DefaultStyles.Paragraph.GetMarginLeft()*2)
w.Breakpoints = []rune{' ', '\n'}
_, _ = fmt.Fprint(w, m.Prompt)
if m.Troubleshoot {
if agent.TroubleshootingURL != "" {
_, _ = fmt.Fprintf(w, " See troubleshooting instructions at:\n%s", agent.TroubleshootingURL)
} else {
_, _ = fmt.Fprint(w, " Wait for it to (re)connect or restart your workspace.")
case codersdk.WorkspaceAgentConnected:
if !showStartupLogs && agent.LifecycleState == codersdk.WorkspaceAgentLifecycleReady {
// The workspace is ready, there's nothing to do but connect.
return nil
}
}
_, _ = fmt.Fprint(w, "\n")
// We want to prefix the prompt with a caret, but we want text on the
// following lines to align with the text on the first line (i.e. added
// spacing).
ind := " " + DefaultStyles.Prompt.String()
iw := indent.NewWriter(1, func(w io.Writer) {
_, _ = w.Write([]byte(ind))
ind = " " // Set indentation to space after initial prompt.
})
_, _ = fmt.Fprint(iw, w.String())
m.Prompt = iw.String()
}()
stage := "Running workspace agent startup script"
follow := opts.Wait
if !follow {
stage += " (non-blocking)"
}
sw.Start(stage)
switch agent.Status {
case codersdk.WorkspaceAgentTimeout:
m.Prompt = "The workspace agent is having trouble connecting."
case codersdk.WorkspaceAgentDisconnected:
m.Prompt = "The workspace agent lost connection!"
case codersdk.WorkspaceAgentConnected:
m.Spin = fmt.Sprintf("Waiting for %s to become ready...", DefaultStyles.Field.Render(agent.Name))
m.Prompt = "Don't panic, your workspace agent has connected and the workspace is getting ready!"
if !opts.Wait {
m.Prompt = "Your workspace is still getting ready, it may be in an incomplete state."
}
err = func() error { // Use func because of defer in for loop.
logStream, logsCloser, err := opts.FetchLogs(ctx, agent.ID, 0, follow)
if err != nil {
return xerrors.Errorf("fetch workspace agent startup logs: %w", err)
}
defer logsCloser.Close()
for {
// This select is essentially and inline `fetch()`.
select {
case <-ctx.Done():
return ctx.Err()
case f := <-fetchedAgent:
if f.err != nil {
return xerrors.Errorf("fetch: %w", f.err)
}
// We could handle changes in the agent status here, like
// if the agent becomes disconnected, we may want to stop.
// But for now, we'll just keep going, hopefully the agent
// will reconnect and update its status.
agent = f.agent
case logs, ok := <-logStream:
if !ok {
return nil
}
for _, log := range logs {
sw.Log(log.CreatedAt, log.Level, log.Output)
}
}
}
}()
if err != nil {
return err
}
for follow && agent.LifecycleState.Starting() {
if agent, err = fetch(); err != nil {
return xerrors.Errorf("fetch: %w", err)
}
}
switch agent.LifecycleState {
case codersdk.WorkspaceAgentLifecycleStartTimeout:
m.Prompt = "The workspace is taking longer than expected to get ready, the agent startup script is still executing."
case codersdk.WorkspaceAgentLifecycleStartError:
m.Spin = ""
m.Prompt = "The workspace ran into a problem while getting ready, the agent startup script exited with non-zero status."
default:
switch agent.LifecycleState {
case codersdk.WorkspaceAgentLifecycleShutdownTimeout:
m.Spin = ""
m.Prompt = "The workspace is shutting down, but is taking longer than expected to shut down and the agent shutdown script is still executing."
m.Troubleshoot = true
case codersdk.WorkspaceAgentLifecycleShutdownError:
m.Spin = ""
m.Prompt = "The workspace ran into a problem while shutting down, the agent shutdown script exited with non-zero status."
m.Troubleshoot = true
case codersdk.WorkspaceAgentLifecycleShuttingDown:
m.Spin = ""
m.Prompt = "The workspace is shutting down."
case codersdk.WorkspaceAgentLifecycleOff:
m.Spin = ""
m.Prompt = "The workspace is not running."
case codersdk.WorkspaceAgentLifecycleReady:
sw.Complete(stage, agent.ReadyAt.Sub(*agent.StartedAt))
case codersdk.WorkspaceAgentLifecycleStartError:
sw.Fail(stage, agent.ReadyAt.Sub(*agent.StartedAt))
// Use zero time (omitted) to separate these from the startup logs.
sw.Log(time.Time{}, codersdk.LogLevelWarn, "Warning: The startup script exited with an error and your workspace may be incomplete.")
sw.Log(time.Time{}, codersdk.LogLevelWarn, troubleshootingMessage(agent, "https://coder.com/docs/v2/latest/templates#startup-script-exited-with-an-error"))
default:
switch {
case agent.LifecycleState.Starting():
// Use zero time (omitted) to separate these from the startup logs.
sw.Log(time.Time{}, codersdk.LogLevelWarn, "Notice: The startup script is still running and your workspace may be incomplete.")
sw.Log(time.Time{}, codersdk.LogLevelWarn, troubleshootingMessage(agent, "https://coder.com/docs/v2/latest/templates#your-workspace-may-be-incomplete"))
// Note: We don't complete or fail the stage here, it's
// intentionally left open to indicate this stage didn't
// complete.
case agent.LifecycleState.ShuttingDown():
// We no longer know if the startup script failed or not,
// but we need to tell the user something.
sw.Complete(stage, agent.ReadyAt.Sub(*agent.StartedAt))
return errAgentShuttingDown
}
}
// Not a failure state, no troubleshooting necessary.
return m
return nil
case codersdk.WorkspaceAgentDisconnected:
// If the agent was still starting during disconnect, we'll
// show startup logs.
showStartupLogs = agent.LifecycleState.Starting()
stage := "The workspace agent lost connection"
sw.Start(stage)
sw.Log(time.Now(), codersdk.LogLevelWarn, "Wait for it to reconnect or restart your workspace.")
sw.Log(time.Now(), codersdk.LogLevelWarn, troubleshootingMessage(agent, "https://coder.com/docs/v2/latest/templates#agent-connection-issues"))
for agent.Status == codersdk.WorkspaceAgentDisconnected {
if agent, err = fetch(); err != nil {
return xerrors.Errorf("fetch: %w", err)
}
}
sw.Complete(stage, agent.LastConnectedAt.Sub(*agent.DisconnectedAt))
}
default:
// Not a failure state, no troubleshooting necessary.
return m
}
m.Troubleshoot = true
}
func troubleshootingMessage(agent codersdk.WorkspaceAgent, url string) string {
m := "For more information and troubleshooting, see " + url
if agent.TroubleshootingURL != "" {
m += " and " + agent.TroubleshootingURL
}
return m
}
type closeFunc func() error
func (c closeFunc) Close() error {
return c()
}

View File

@ -1,363 +1,353 @@
package cliui_test
import (
"bufio"
"bytes"
"context"
"io"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"golang.org/x/xerrors"
"github.com/coder/coder/cli/clibase"
"github.com/coder/coder/cli/clitest"
"github.com/coder/coder/cli/cliui"
"github.com/coder/coder/codersdk"
"github.com/coder/coder/pty/ptytest"
"github.com/coder/coder/testutil"
)
func TestAgent(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
var disconnected atomic.Bool
ptty := ptytest.New(t)
cmd := &clibase.Cmd{
Handler: func(inv *clibase.Invocation) error {
err := cliui.Agent(inv.Context(), inv.Stdout, cliui.AgentOptions{
WorkspaceName: "example",
Fetch: func(_ context.Context) (codersdk.WorkspaceAgent, error) {
agent := codersdk.WorkspaceAgent{
Status: codersdk.WorkspaceAgentDisconnected,
StartupScriptBehavior: codersdk.WorkspaceAgentStartupScriptBehaviorNonBlocking,
}
if disconnected.Load() {
agent.Status = codersdk.WorkspaceAgentConnected
}
return agent, nil
},
FetchInterval: time.Millisecond,
WarnInterval: 10 * time.Millisecond,
})
return err
},
ptrTime := func(t time.Time) *time.Time {
return &t
}
inv := cmd.Invoke()
ptty.Attach(inv)
done := make(chan struct{})
go func() {
defer close(done)
err := inv.Run()
assert.NoError(t, err)
}()
ptty.ExpectMatchContext(ctx, "lost connection")
disconnected.Store(true)
<-done
}
func TestAgent_TimeoutWithTroubleshootingURL(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
wantURL := "https://coder.com/troubleshoot"
var connected, timeout atomic.Bool
cmd := &clibase.Cmd{
Handler: func(inv *clibase.Invocation) error {
err := cliui.Agent(inv.Context(), inv.Stdout, cliui.AgentOptions{
WorkspaceName: "example",
Fetch: func(_ context.Context) (codersdk.WorkspaceAgent, error) {
agent := codersdk.WorkspaceAgent{
Status: codersdk.WorkspaceAgentConnecting,
TroubleshootingURL: wantURL,
StartupScriptBehavior: codersdk.WorkspaceAgentStartupScriptBehaviorNonBlocking,
}
switch {
case !connected.Load() && timeout.Load():
agent.Status = codersdk.WorkspaceAgentTimeout
case connected.Load():
agent.Status = codersdk.WorkspaceAgentConnected
}
return agent, nil
},
for _, tc := range []struct {
name string
iter []func(context.Context, *codersdk.WorkspaceAgent, chan []codersdk.WorkspaceAgentStartupLog) error
logs chan []codersdk.WorkspaceAgentStartupLog
opts cliui.AgentOptions
want []string
wantErr bool
}{
{
name: "Initial connection",
opts: cliui.AgentOptions{
FetchInterval: time.Millisecond,
WarnInterval: 5 * time.Millisecond,
})
return err
},
iter: []func(context.Context, *codersdk.WorkspaceAgent, chan []codersdk.WorkspaceAgentStartupLog) error{
func(_ context.Context, agent *codersdk.WorkspaceAgent, _ chan []codersdk.WorkspaceAgentStartupLog) error {
agent.Status = codersdk.WorkspaceAgentConnecting
return nil
},
func(_ context.Context, agent *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error {
agent.Status = codersdk.WorkspaceAgentConnected
agent.FirstConnectedAt = ptrTime(time.Now())
close(logs)
return nil
},
},
want: []string{
"⧗ Waiting for the workspace agent to connect",
"✔ Waiting for the workspace agent to connect",
"⧗ Running workspace agent startup script (non-blocking)",
"Notice: The startup script is still running and your workspace may be incomplete.",
"For more information and troubleshooting, see",
},
},
}
ptty := ptytest.New(t)
inv := cmd.Invoke()
ptty.Attach(inv)
done := make(chan error, 1)
go func() {
done <- inv.WithContext(ctx).Run()
}()
ptty.ExpectMatchContext(ctx, "Don't panic, your workspace is booting")
timeout.Store(true)
ptty.ExpectMatchContext(ctx, wantURL)
connected.Store(true)
require.NoError(t, <-done)
}
func TestAgent_StartupTimeout(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
wantURL := "https://coder.com/this-is-a-really-long-troubleshooting-url-that-should-not-wrap"
var status, state atomic.String
setStatus := func(s codersdk.WorkspaceAgentStatus) { status.Store(string(s)) }
setState := func(s codersdk.WorkspaceAgentLifecycle) { state.Store(string(s)) }
cmd := &clibase.Cmd{
Handler: func(inv *clibase.Invocation) error {
err := cliui.Agent(inv.Context(), inv.Stdout, cliui.AgentOptions{
WorkspaceName: "example",
Fetch: func(_ context.Context) (codersdk.WorkspaceAgent, error) {
agent := codersdk.WorkspaceAgent{
Status: codersdk.WorkspaceAgentConnecting,
StartupScriptBehavior: codersdk.WorkspaceAgentStartupScriptBehaviorBlocking,
LifecycleState: codersdk.WorkspaceAgentLifecycleCreated,
TroubleshootingURL: wantURL,
}
if s := status.Load(); s != "" {
agent.Status = codersdk.WorkspaceAgentStatus(s)
}
if s := state.Load(); s != "" {
agent.LifecycleState = codersdk.WorkspaceAgentLifecycle(s)
}
return agent, nil
{
name: "Initial connection timeout",
opts: cliui.AgentOptions{
FetchInterval: 1 * time.Millisecond,
},
iter: []func(context.Context, *codersdk.WorkspaceAgent, chan []codersdk.WorkspaceAgentStartupLog) error{
func(_ context.Context, agent *codersdk.WorkspaceAgent, _ chan []codersdk.WorkspaceAgentStartupLog) error {
agent.Status = codersdk.WorkspaceAgentConnecting
agent.LifecycleState = codersdk.WorkspaceAgentLifecycleStarting
agent.StartedAt = ptrTime(time.Now())
return nil
},
func(_ context.Context, agent *codersdk.WorkspaceAgent, _ chan []codersdk.WorkspaceAgentStartupLog) error {
agent.Status = codersdk.WorkspaceAgentTimeout
return nil
},
func(_ context.Context, agent *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error {
agent.Status = codersdk.WorkspaceAgentConnected
agent.FirstConnectedAt = ptrTime(time.Now())
agent.LifecycleState = codersdk.WorkspaceAgentLifecycleReady
agent.ReadyAt = ptrTime(time.Now())
close(logs)
return nil
},
},
want: []string{
"⧗ Waiting for the workspace agent to connect",
"The workspace agent is having trouble connecting, wait for it to connect or restart your workspace.",
"For more information and troubleshooting, see",
"✔ Waiting for the workspace agent to connect",
"⧗ Running workspace agent startup script (non-blocking)",
"✔ Running workspace agent startup script (non-blocking)",
},
},
{
name: "Disconnected",
opts: cliui.AgentOptions{
FetchInterval: 1 * time.Millisecond,
},
iter: []func(context.Context, *codersdk.WorkspaceAgent, chan []codersdk.WorkspaceAgentStartupLog) error{
func(_ context.Context, agent *codersdk.WorkspaceAgent, _ chan []codersdk.WorkspaceAgentStartupLog) error {
agent.Status = codersdk.WorkspaceAgentDisconnected
agent.FirstConnectedAt = ptrTime(time.Now().Add(-1 * time.Minute))
agent.LastConnectedAt = ptrTime(time.Now().Add(-1 * time.Minute))
agent.DisconnectedAt = ptrTime(time.Now())
agent.LifecycleState = codersdk.WorkspaceAgentLifecycleReady
agent.StartedAt = ptrTime(time.Now().Add(-1 * time.Minute))
agent.ReadyAt = ptrTime(time.Now())
return nil
},
func(_ context.Context, agent *codersdk.WorkspaceAgent, _ chan []codersdk.WorkspaceAgentStartupLog) error {
agent.Status = codersdk.WorkspaceAgentConnected
agent.LastConnectedAt = ptrTime(time.Now())
return nil
},
func(_ context.Context, _ *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error {
close(logs)
return nil
},
},
want: []string{
"⧗ The workspace agent lost connection",
"Wait for it to reconnect or restart your workspace.",
"For more information and troubleshooting, see",
"✔ The workspace agent lost connection",
},
},
{
name: "Startup script logs",
opts: cliui.AgentOptions{
FetchInterval: time.Millisecond,
WarnInterval: time.Millisecond,
Wait: true,
})
return err
},
}
ptty := ptytest.New(t)
inv := cmd.Invoke()
ptty.Attach(inv)
done := make(chan error, 1)
go func() {
done <- inv.WithContext(ctx).Run()
}()
setStatus(codersdk.WorkspaceAgentConnecting)
ptty.ExpectMatchContext(ctx, "Don't panic, your workspace is booting")
setStatus(codersdk.WorkspaceAgentConnected)
setState(codersdk.WorkspaceAgentLifecycleStarting)
ptty.ExpectMatchContext(ctx, "workspace is getting ready")
setState(codersdk.WorkspaceAgentLifecycleStartTimeout)
ptty.ExpectMatchContext(ctx, "is taking longer")
ptty.ExpectMatchContext(ctx, wantURL)
setState(codersdk.WorkspaceAgentLifecycleReady)
require.NoError(t, <-done)
}
func TestAgent_StartErrorExit(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
wantURL := "https://coder.com/this-is-a-really-long-troubleshooting-url-that-should-not-wrap"
var status, state atomic.String
setStatus := func(s codersdk.WorkspaceAgentStatus) { status.Store(string(s)) }
setState := func(s codersdk.WorkspaceAgentLifecycle) { state.Store(string(s)) }
cmd := &clibase.Cmd{
Handler: func(inv *clibase.Invocation) error {
err := cliui.Agent(inv.Context(), inv.Stdout, cliui.AgentOptions{
WorkspaceName: "example",
Fetch: func(_ context.Context) (codersdk.WorkspaceAgent, error) {
agent := codersdk.WorkspaceAgent{
Status: codersdk.WorkspaceAgentConnecting,
StartupScriptBehavior: codersdk.WorkspaceAgentStartupScriptBehaviorBlocking,
LifecycleState: codersdk.WorkspaceAgentLifecycleCreated,
TroubleshootingURL: wantURL,
},
iter: []func(context.Context, *codersdk.WorkspaceAgent, chan []codersdk.WorkspaceAgentStartupLog) error{
func(_ context.Context, agent *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error {
agent.Status = codersdk.WorkspaceAgentConnected
agent.FirstConnectedAt = ptrTime(time.Now())
agent.LifecycleState = codersdk.WorkspaceAgentLifecycleStarting
agent.StartedAt = ptrTime(time.Now())
logs <- []codersdk.WorkspaceAgentStartupLog{
{
CreatedAt: time.Now(),
Output: "Hello world",
},
}
if s := status.Load(); s != "" {
agent.Status = codersdk.WorkspaceAgentStatus(s)
}
if s := state.Load(); s != "" {
agent.LifecycleState = codersdk.WorkspaceAgentLifecycle(s)
}
return agent, nil
return nil
},
func(_ context.Context, agent *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error {
agent.LifecycleState = codersdk.WorkspaceAgentLifecycleReady
agent.ReadyAt = ptrTime(time.Now())
logs <- []codersdk.WorkspaceAgentStartupLog{
{
CreatedAt: time.Now(),
Output: "Bye now",
},
}
close(logs)
return nil
},
},
want: []string{
"⧗ Running workspace agent startup script",
"Hello world",
"Bye now",
"✔ Running workspace agent startup script",
},
},
{
name: "Startup script exited with error",
opts: cliui.AgentOptions{
FetchInterval: time.Millisecond,
WarnInterval: 60 * time.Second,
Wait: true,
})
return err
},
}
ptty := ptytest.New(t)
inv := cmd.Invoke()
ptty.Attach(inv)
done := make(chan error, 1)
go func() {
done <- inv.WithContext(ctx).Run()
}()
setStatus(codersdk.WorkspaceAgentConnected)
setState(codersdk.WorkspaceAgentLifecycleStarting)
ptty.ExpectMatchContext(ctx, "to become ready...")
setState(codersdk.WorkspaceAgentLifecycleStartError)
ptty.ExpectMatchContext(ctx, "ran into a problem")
err := <-done
require.ErrorIs(t, err, cliui.AgentStartError, "lifecycle start_error should exit with error")
}
func TestAgent_NoWait(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
wantURL := "https://coder.com/this-is-a-really-long-troubleshooting-url-that-should-not-wrap"
var status, state atomic.String
setStatus := func(s codersdk.WorkspaceAgentStatus) { status.Store(string(s)) }
setState := func(s codersdk.WorkspaceAgentLifecycle) { state.Store(string(s)) }
cmd := &clibase.Cmd{
Handler: func(inv *clibase.Invocation) error {
err := cliui.Agent(inv.Context(), inv.Stdout, cliui.AgentOptions{
WorkspaceName: "example",
Fetch: func(_ context.Context) (codersdk.WorkspaceAgent, error) {
agent := codersdk.WorkspaceAgent{
Status: codersdk.WorkspaceAgentConnecting,
StartupScriptBehavior: codersdk.WorkspaceAgentStartupScriptBehaviorBlocking,
LifecycleState: codersdk.WorkspaceAgentLifecycleCreated,
TroubleshootingURL: wantURL,
},
iter: []func(context.Context, *codersdk.WorkspaceAgent, chan []codersdk.WorkspaceAgentStartupLog) error{
func(_ context.Context, agent *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error {
agent.Status = codersdk.WorkspaceAgentConnected
agent.FirstConnectedAt = ptrTime(time.Now())
agent.StartedAt = ptrTime(time.Now())
agent.LifecycleState = codersdk.WorkspaceAgentLifecycleStartError
agent.ReadyAt = ptrTime(time.Now())
logs <- []codersdk.WorkspaceAgentStartupLog{
{
CreatedAt: time.Now(),
Output: "Hello world",
},
}
if s := status.Load(); s != "" {
agent.Status = codersdk.WorkspaceAgentStatus(s)
}
if s := state.Load(); s != "" {
agent.LifecycleState = codersdk.WorkspaceAgentLifecycle(s)
}
return agent, nil
close(logs)
return nil
},
FetchInterval: time.Millisecond,
WarnInterval: time.Second,
Wait: false,
})
return err
},
want: []string{
"⧗ Running workspace agent startup script",
"Hello world",
"✘ Running workspace agent startup script",
"Warning: The startup script exited with an error and your workspace may be incomplete.",
"For more information and troubleshooting, see",
},
},
}
ptty := ptytest.New(t)
inv := cmd.Invoke()
ptty.Attach(inv)
done := make(chan error, 1)
go func() {
done <- inv.WithContext(ctx).Run()
}()
setStatus(codersdk.WorkspaceAgentConnecting)
ptty.ExpectMatchContext(ctx, "Don't panic, your workspace is booting")
setStatus(codersdk.WorkspaceAgentConnected)
require.NoError(t, <-done, "created - should exit early")
setState(codersdk.WorkspaceAgentLifecycleStarting)
go func() { done <- inv.WithContext(ctx).Run() }()
require.NoError(t, <-done, "starting - should exit early")
setState(codersdk.WorkspaceAgentLifecycleStartTimeout)
go func() { done <- inv.WithContext(ctx).Run() }()
require.NoError(t, <-done, "start timeout - should exit early")
setState(codersdk.WorkspaceAgentLifecycleStartError)
go func() { done <- inv.WithContext(ctx).Run() }()
require.NoError(t, <-done, "start error - should exit early")
setState(codersdk.WorkspaceAgentLifecycleReady)
go func() { done <- inv.WithContext(ctx).Run() }()
require.NoError(t, <-done, "ready - should exit early")
}
func TestAgent_StartupScriptBehaviorNonBlocking(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
wantURL := "https://coder.com/this-is-a-really-long-troubleshooting-url-that-should-not-wrap"
var status, state atomic.String
setStatus := func(s codersdk.WorkspaceAgentStatus) { status.Store(string(s)) }
setState := func(s codersdk.WorkspaceAgentLifecycle) { state.Store(string(s)) }
cmd := &clibase.Cmd{
Handler: func(inv *clibase.Invocation) error {
err := cliui.Agent(inv.Context(), inv.Stdout, cliui.AgentOptions{
WorkspaceName: "example",
Fetch: func(_ context.Context) (codersdk.WorkspaceAgent, error) {
agent := codersdk.WorkspaceAgent{
Status: codersdk.WorkspaceAgentConnecting,
StartupScriptBehavior: codersdk.WorkspaceAgentStartupScriptBehaviorNonBlocking,
LifecycleState: codersdk.WorkspaceAgentLifecycleCreated,
TroubleshootingURL: wantURL,
}
if s := status.Load(); s != "" {
agent.Status = codersdk.WorkspaceAgentStatus(s)
}
if s := state.Load(); s != "" {
agent.LifecycleState = codersdk.WorkspaceAgentLifecycle(s)
}
return agent, nil
},
{
name: "Error when shutting down",
opts: cliui.AgentOptions{
FetchInterval: time.Millisecond,
},
iter: []func(context.Context, *codersdk.WorkspaceAgent, chan []codersdk.WorkspaceAgentStartupLog) error{
func(_ context.Context, agent *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error {
agent.Status = codersdk.WorkspaceAgentDisconnected
agent.LifecycleState = codersdk.WorkspaceAgentLifecycleOff
close(logs)
return nil
},
},
wantErr: true,
},
{
name: "Error when shutting down while waiting",
opts: cliui.AgentOptions{
FetchInterval: time.Millisecond,
WarnInterval: time.Second,
Wait: true,
})
return err
},
iter: []func(context.Context, *codersdk.WorkspaceAgent, chan []codersdk.WorkspaceAgentStartupLog) error{
func(_ context.Context, agent *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error {
agent.Status = codersdk.WorkspaceAgentConnected
agent.FirstConnectedAt = ptrTime(time.Now())
agent.LifecycleState = codersdk.WorkspaceAgentLifecycleStarting
agent.StartedAt = ptrTime(time.Now())
logs <- []codersdk.WorkspaceAgentStartupLog{
{
CreatedAt: time.Now(),
Output: "Hello world",
},
}
return nil
},
func(_ context.Context, agent *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error {
agent.ReadyAt = ptrTime(time.Now())
agent.LifecycleState = codersdk.WorkspaceAgentLifecycleShuttingDown
close(logs)
return nil
},
},
want: []string{
"⧗ Running workspace agent startup script",
"Hello world",
"✔ Running workspace agent startup script",
},
wantErr: true,
},
{
name: "Error during fetch",
opts: cliui.AgentOptions{
FetchInterval: time.Millisecond,
Wait: true,
},
iter: []func(context.Context, *codersdk.WorkspaceAgent, chan []codersdk.WorkspaceAgentStartupLog) error{
func(_ context.Context, agent *codersdk.WorkspaceAgent, _ chan []codersdk.WorkspaceAgentStartupLog) error {
agent.Status = codersdk.WorkspaceAgentConnecting
return nil
},
func(_ context.Context, agent *codersdk.WorkspaceAgent, _ chan []codersdk.WorkspaceAgentStartupLog) error {
return xerrors.New("bad")
},
},
want: []string{
"⧗ Waiting for the workspace agent to connect",
},
wantErr: true,
},
{
name: "Shows agent troubleshooting URL",
opts: cliui.AgentOptions{
FetchInterval: time.Millisecond,
Wait: true,
},
iter: []func(context.Context, *codersdk.WorkspaceAgent, chan []codersdk.WorkspaceAgentStartupLog) error{
func(_ context.Context, agent *codersdk.WorkspaceAgent, _ chan []codersdk.WorkspaceAgentStartupLog) error {
agent.Status = codersdk.WorkspaceAgentTimeout
agent.TroubleshootingURL = "https://troubleshoot"
return nil
},
func(_ context.Context, agent *codersdk.WorkspaceAgent, _ chan []codersdk.WorkspaceAgentStartupLog) error {
return xerrors.New("bad")
},
},
want: []string{
"⧗ Waiting for the workspace agent to connect",
"The workspace agent is having trouble connecting, wait for it to connect or restart your workspace.",
"https://troubleshoot",
},
wantErr: true,
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
var buf bytes.Buffer
agent := codersdk.WorkspaceAgent{
ID: uuid.New(),
Status: codersdk.WorkspaceAgentConnecting,
StartupScriptBehavior: codersdk.WorkspaceAgentStartupScriptBehaviorNonBlocking,
CreatedAt: time.Now(),
LifecycleState: codersdk.WorkspaceAgentLifecycleCreated,
}
logs := make(chan []codersdk.WorkspaceAgentStartupLog, 1)
cmd := &clibase.Cmd{
Handler: func(inv *clibase.Invocation) error {
tc.opts.Fetch = func(_ context.Context) (codersdk.WorkspaceAgent, error) {
var err error
if len(tc.iter) > 0 {
err = tc.iter[0](ctx, &agent, logs)
tc.iter = tc.iter[1:]
}
return agent, err
}
tc.opts.FetchLogs = func(_ context.Context, _ uuid.UUID, _ int64, _ bool) (<-chan []codersdk.WorkspaceAgentStartupLog, io.Closer, error) {
return logs, closeFunc(func() error { return nil }), nil
}
err := cliui.Agent(inv.Context(), &buf, tc.opts)
return err
},
}
inv := cmd.Invoke()
w := clitest.StartWithWaiter(t, inv)
if tc.wantErr {
w.RequireError()
} else {
w.RequireSuccess()
}
s := bufio.NewScanner(&buf)
for s.Scan() {
line := s.Text()
t.Log(line)
if len(tc.want) == 0 {
require.Fail(t, "unexpected line: "+line)
}
require.Contains(t, line, tc.want[0])
tc.want = tc.want[1:]
}
require.NoError(t, s.Err())
if len(tc.want) > 0 {
require.Fail(t, "missing lines: "+strings.Join(tc.want, ", "))
}
})
}
inv := cmd.Invoke()
ptty := ptytest.New(t)
ptty.Attach(inv)
done := make(chan error, 1)
go func() {
done <- inv.WithContext(ctx).Run()
}()
setStatus(codersdk.WorkspaceAgentConnecting)
ptty.ExpectMatchContext(ctx, "Don't panic, your workspace is booting")
setStatus(codersdk.WorkspaceAgentConnected)
require.NoError(t, <-done, "created - should exit early")
setState(codersdk.WorkspaceAgentLifecycleStarting)
go func() { done <- inv.WithContext(ctx).Run() }()
require.NoError(t, <-done, "starting - should exit early")
setState(codersdk.WorkspaceAgentLifecycleStartTimeout)
go func() { done <- inv.WithContext(ctx).Run() }()
require.NoError(t, <-done, "start timeout - should exit early")
setState(codersdk.WorkspaceAgentLifecycleStartError)
go func() { done <- inv.WithContext(ctx).Run() }()
require.NoError(t, <-done, "start error - should exit early")
setState(codersdk.WorkspaceAgentLifecycleReady)
go func() { done <- inv.WithContext(ctx).Run() }()
require.NoError(t, <-done, "ready - should exit early")
}

View File

@ -7,6 +7,7 @@ import (
"io"
"os"
"os/signal"
"strings"
"sync"
"time"
@ -69,21 +70,20 @@ func ProvisionerJob(ctx context.Context, writer io.Writer, opts ProvisionerJobOp
jobMutex sync.Mutex
)
sw := &stageWriter{w: writer, verbose: opts.Verbose, silentLogs: opts.Silent}
printStage := func() {
_, _ = fmt.Fprintf(writer, "==> ⧗ %s\n", currentStage)
sw.Start(currentStage)
}
updateStage := func(stage string, startedAt time.Time) {
if currentStage != "" {
mark := "✔"
duration := startedAt.Sub(currentStageStartedAt)
if job.CompletedAt != nil && job.Status != codersdk.ProvisionerJobSucceeded {
mark = "✘"
sw.Fail(currentStage, duration)
} else {
sw.Complete(currentStage, duration)
}
dur := startedAt.Sub(currentStageStartedAt).Milliseconds()
if dur < 0 {
dur = 0
}
_, _ = fmt.Fprintf(writer, "=== %s %s [%dms]\n", mark, currentStage, dur)
}
if stage == "" {
return
@ -147,30 +147,15 @@ func ProvisionerJob(ctx context.Context, writer io.Writer, opts ProvisionerJobOp
}
defer closer.Close()
var (
// logOutput is where log output is written
logOutput = writer
// logBuffer is where logs are buffered if opts.Silent is true
logBuffer = &bytes.Buffer{}
)
if opts.Silent {
logOutput = logBuffer
}
flushLogBuffer := func() {
if opts.Silent {
_, _ = io.Copy(writer, logBuffer)
}
}
ticker := time.NewTicker(opts.FetchInterval)
defer ticker.Stop()
for {
select {
case err = <-errChan:
flushLogBuffer()
sw.Fail(currentStage, time.Since(currentStageStartedAt))
return err
case <-ctx.Done():
flushLogBuffer()
sw.Fail(currentStage, time.Since(currentStageStartedAt))
return ctx.Err()
case <-ticker.C:
updateJob()
@ -194,34 +179,89 @@ func ProvisionerJob(ctx context.Context, writer io.Writer, opts ProvisionerJobOp
Message: job.Error,
Code: job.ErrorCode,
}
sw.Fail(currentStage, time.Since(currentStageStartedAt))
jobMutex.Unlock()
flushLogBuffer()
return err
}
output := ""
switch log.Level {
case codersdk.LogLevelTrace, codersdk.LogLevelDebug:
if !opts.Verbose {
continue
}
output = DefaultStyles.Placeholder.Render(log.Output)
case codersdk.LogLevelError:
output = DefaultStyles.Error.Render(log.Output)
case codersdk.LogLevelWarn:
output = DefaultStyles.Warn.Render(log.Output)
case codersdk.LogLevelInfo:
output = log.Output
}
jobMutex.Lock()
if log.Stage != currentStage && log.Stage != "" {
updateStage(log.Stage, log.CreatedAt)
jobMutex.Unlock()
continue
}
_, _ = fmt.Fprintf(logOutput, "%s\n", output)
sw.Log(log.CreatedAt, log.Level, log.Output)
jobMutex.Unlock()
}
}
}
type stageWriter struct {
w io.Writer
verbose bool
silentLogs bool
logBuf bytes.Buffer
}
func (s *stageWriter) Start(stage string) {
_, _ = fmt.Fprintf(s.w, "==> ⧗ %s\n", stage)
}
func (s *stageWriter) Complete(stage string, duration time.Duration) {
s.end(stage, duration, true)
}
func (s *stageWriter) Fail(stage string, duration time.Duration) {
s.flushLogs()
s.end(stage, duration, false)
}
//nolint:revive
func (s *stageWriter) end(stage string, duration time.Duration, ok bool) {
s.logBuf.Reset()
mark := "✔"
if !ok {
mark = "✘"
}
if duration < 0 {
duration = 0
}
_, _ = fmt.Fprintf(s.w, "=== %s %s [%dms]\n", mark, stage, duration.Milliseconds())
}
func (s *stageWriter) Log(createdAt time.Time, level codersdk.LogLevel, line string) {
w := s.w
if s.silentLogs {
w = &s.logBuf
}
render := func(s ...string) string { return strings.Join(s, " ") }
var lines []string
if !createdAt.IsZero() {
lines = append(lines, createdAt.Local().Format("2006-01-02 15:04:05.000Z07:00"))
}
lines = append(lines, line)
switch level {
case codersdk.LogLevelTrace, codersdk.LogLevelDebug:
if !s.verbose {
return
}
render = DefaultStyles.Placeholder.Render
case codersdk.LogLevelError:
render = DefaultStyles.Error.Render
case codersdk.LogLevelWarn:
render = DefaultStyles.Warn.Render
case codersdk.LogLevelInfo:
}
_, _ = fmt.Fprintf(w, "%s\n", render(lines...))
}
func (s *stageWriter) flushLogs() {
if s.silentLogs {
_, _ = io.Copy(s.w, &s.logBuf)
}
s.logBuf.Reset()
}

View File

@ -91,7 +91,6 @@ func (r *RootCmd) portForward() *clibase.Cmd {
}
err = cliui.Agent(ctx, inv.Stderr, cliui.AgentOptions{
WorkspaceName: workspace.Name,
Fetch: func(ctx context.Context) (codersdk.WorkspaceAgent, error) {
return client.WorkspaceAgent(ctx, workspaceAgent.ID)
},

View File

@ -35,19 +35,18 @@ func (r *RootCmd) speedtest() *clibase.Cmd {
ctx, cancel := context.WithCancel(inv.Context())
defer cancel()
workspace, workspaceAgent, err := getWorkspaceAndAgent(ctx, inv, client, codersdk.Me, inv.Args[0])
_, workspaceAgent, err := getWorkspaceAndAgent(ctx, inv, client, codersdk.Me, inv.Args[0])
if err != nil {
return err
}
err = cliui.Agent(ctx, inv.Stderr, cliui.AgentOptions{
WorkspaceName: workspace.Name,
Fetch: func(ctx context.Context) (codersdk.WorkspaceAgent, error) {
return client.WorkspaceAgent(ctx, workspaceAgent.ID)
},
Wait: false,
})
if err != nil && !xerrors.Is(err, cliui.AgentStartError) {
if err != nil {
return xerrors.Errorf("await agent: %w", err)
}

View File

@ -176,23 +176,16 @@ func (r *RootCmd) ssh() *clibase.Cmd {
// OpenSSH passes stderr directly to the calling TTY.
// This is required in "stdio" mode so a connecting indicator can be displayed.
err = cliui.Agent(ctx, inv.Stderr, cliui.AgentOptions{
WorkspaceName: workspace.Name,
Fetch: func(ctx context.Context) (codersdk.WorkspaceAgent, error) {
return client.WorkspaceAgent(ctx, workspaceAgent.ID)
},
Wait: wait,
FetchLogs: client.WorkspaceAgentStartupLogsAfter,
Wait: wait,
})
if err != nil {
if xerrors.Is(err, context.Canceled) {
return cliui.Canceled
}
if !xerrors.Is(err, cliui.AgentStartError) {
return xerrors.Errorf("await agent: %w", err)
}
// We don't want to fail on a startup script error because it's
// natural that the user will want to fix the script and try again.
// We don't print the error because cliui.Agent does that for us.
}
if r.disableDirect {

View File

@ -5,12 +5,14 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"net/url"
"os"
"strings"
"sync/atomic"
"time"
"github.com/google/uuid"
"golang.org/x/xerrors"
"github.com/coder/coder/cli/clibase"
@ -164,25 +166,91 @@ func main() {
root.Children = append(root.Children, &clibase.Cmd{
Use: "agent",
Handler: func(inv *clibase.Invocation) error {
agent := codersdk.WorkspaceAgent{
Status: codersdk.WorkspaceAgentDisconnected,
LifecycleState: codersdk.WorkspaceAgentLifecycleReady,
var agent codersdk.WorkspaceAgent
var logs []codersdk.WorkspaceAgentStartupLog
fetchSteps := []func(){
func() {
createdAt := time.Now().Add(-time.Minute)
agent = codersdk.WorkspaceAgent{
CreatedAt: createdAt,
Status: codersdk.WorkspaceAgentConnecting,
LifecycleState: codersdk.WorkspaceAgentLifecycleCreated,
}
},
func() {
time.Sleep(time.Second)
agent.Status = codersdk.WorkspaceAgentTimeout
},
func() {
agent.LifecycleState = codersdk.WorkspaceAgentLifecycleStarting
startingAt := time.Now()
agent.StartedAt = &startingAt
for i := 0; i < 10; i++ {
level := codersdk.LogLevelInfo
if rand.Float64() > 0.75 { //nolint:gosec
level = codersdk.LogLevelError
}
logs = append(logs, codersdk.WorkspaceAgentStartupLog{
CreatedAt: time.Now().Add(-time.Duration(10-i) * 144 * time.Millisecond),
Output: fmt.Sprintf("Some log %d", i),
Level: level,
})
}
},
func() {
time.Sleep(time.Second)
firstConnectedAt := time.Now()
agent.FirstConnectedAt = &firstConnectedAt
lastConnectedAt := firstConnectedAt.Add(0)
agent.LastConnectedAt = &lastConnectedAt
agent.Status = codersdk.WorkspaceAgentConnected
},
func() {},
func() {
time.Sleep(5 * time.Second)
agent.Status = codersdk.WorkspaceAgentConnected
lastConnectedAt := time.Now()
agent.LastConnectedAt = &lastConnectedAt
},
}
go func() {
time.Sleep(3 * time.Second)
agent.Status = codersdk.WorkspaceAgentConnected
}()
err := cliui.Agent(inv.Context(), inv.Stdout, cliui.AgentOptions{
WorkspaceName: "dev",
Fetch: func(ctx context.Context) (codersdk.WorkspaceAgent, error) {
FetchInterval: 100 * time.Millisecond,
Wait: true,
Fetch: func(_ context.Context) (codersdk.WorkspaceAgent, error) {
if len(fetchSteps) == 0 {
return agent, nil
}
step := fetchSteps[0]
fetchSteps = fetchSteps[1:]
step()
return agent, nil
},
WarnInterval: 2 * time.Second,
FetchLogs: func(_ context.Context, _ uuid.UUID, _ int64, follow bool) (<-chan []codersdk.WorkspaceAgentStartupLog, io.Closer, error) {
logsC := make(chan []codersdk.WorkspaceAgentStartupLog, len(logs))
if follow {
go func() {
defer close(logsC)
for _, log := range logs {
logsC <- []codersdk.WorkspaceAgentStartupLog{log}
time.Sleep(144 * time.Millisecond)
}
agent.LifecycleState = codersdk.WorkspaceAgentLifecycleReady
readyAt := database.Now()
agent.ReadyAt = &readyAt
}()
} else {
logsC <- logs
close(logsC)
}
return logsC, closeFunc(func() error {
return nil
}), nil
},
})
if err != nil {
return err
}
_, _ = fmt.Printf("Completed!\n")
return nil
},
})
@ -278,3 +346,9 @@ func main() {
os.Exit(1)
}
}
type closeFunc func() error
func (f closeFunc) Close() error {
return f()
}

View File

@ -225,7 +225,7 @@ func TestWorkspaceAgentStartupLogs(t *testing.T) {
})
require.NoError(t, err)
logs, closer, err := client.WorkspaceAgentStartupLogsAfter(ctx, build.Resources[0].Agents[0].ID, 0)
logs, closer, err := client.WorkspaceAgentStartupLogsAfter(ctx, build.Resources[0].Agents[0].ID, 0, true)
require.NoError(t, err)
defer func() {
_ = closer.Close()
@ -338,7 +338,7 @@ func TestWorkspaceAgentStartupLogs(t *testing.T) {
agentClient := agentsdk.New(client.URL)
agentClient.SetSessionToken(authToken)
logs, closer, err := client.WorkspaceAgentStartupLogsAfter(ctx, build.Resources[0].Agents[0].ID, 0)
logs, closer, err := client.WorkspaceAgentStartupLogsAfter(ctx, build.Resources[0].Agents[0].ID, 0, true)
require.NoError(t, err)
defer func() {
_ = closer.Close()

View File

@ -11,6 +11,7 @@ import (
"net/http/cookiejar"
"net/netip"
"strconv"
"strings"
"time"
"github.com/google/uuid"
@ -64,6 +65,17 @@ func (l WorkspaceAgentLifecycle) Starting() bool {
}
}
// ShuttingDown returns true if the agent is in the process of shutting
// down or has shut down.
func (l WorkspaceAgentLifecycle) ShuttingDown() bool {
switch l {
case WorkspaceAgentLifecycleShuttingDown, WorkspaceAgentLifecycleShutdownTimeout, WorkspaceAgentLifecycleShutdownError, WorkspaceAgentLifecycleOff:
return true
default:
return false
}
}
// WorkspaceAgentLifecycleOrder is the order in which workspace agent
// lifecycle states are expected to be reported during the lifetime of
// the agent process. For instance, the agent can go from starting to
@ -536,20 +548,52 @@ func (c *Client) WorkspaceAgentListeningPorts(ctx context.Context, agentID uuid.
return listeningPorts, json.NewDecoder(res.Body).Decode(&listeningPorts)
}
func (c *Client) WorkspaceAgentStartupLogsAfter(ctx context.Context, agentID uuid.UUID, after int64) (<-chan []WorkspaceAgentStartupLog, io.Closer, error) {
afterQuery := ""
//nolint:revive // Follow is a control flag on the server as well.
func (c *Client) WorkspaceAgentStartupLogsAfter(ctx context.Context, agentID uuid.UUID, after int64, follow bool) (<-chan []WorkspaceAgentStartupLog, io.Closer, error) {
var queryParams []string
if after != 0 {
afterQuery = fmt.Sprintf("&after=%d", after)
queryParams = append(queryParams, fmt.Sprintf("after=%d", after))
}
followURL, err := c.URL.Parse(fmt.Sprintf("/api/v2/workspaceagents/%s/startup-logs?follow%s", agentID, afterQuery))
if follow {
queryParams = append(queryParams, "follow")
}
var query string
if len(queryParams) > 0 {
query = "?" + strings.Join(queryParams, "&")
}
reqURL, err := c.URL.Parse(fmt.Sprintf("/api/v2/workspaceagents/%s/startup-logs%s", agentID, query))
if err != nil {
return nil, nil, err
}
if !follow {
resp, err := c.Request(ctx, http.MethodGet, reqURL.String(), nil)
if err != nil {
return nil, nil, xerrors.Errorf("execute request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, nil, ReadBodyAsError(resp)
}
var logs []WorkspaceAgentStartupLog
err = json.NewDecoder(resp.Body).Decode(&logs)
if err != nil {
return nil, nil, xerrors.Errorf("decode startup logs: %w", err)
}
ch := make(chan []WorkspaceAgentStartupLog, 1)
ch <- logs
close(ch)
return ch, closeFunc(func() error { return nil }), nil
}
jar, err := cookiejar.New(nil)
if err != nil {
return nil, nil, xerrors.Errorf("create cookie jar: %w", err)
}
jar.SetCookies(followURL, []*http.Cookie{{
jar.SetCookies(reqURL, []*http.Cookie{{
Name: SessionTokenCookie,
Value: c.SessionToken(),
}})
@ -557,7 +601,7 @@ func (c *Client) WorkspaceAgentStartupLogsAfter(ctx context.Context, agentID uui
Jar: jar,
Transport: c.HTTPClient.Transport,
}
conn, res, err := websocket.Dial(ctx, followURL.String(), &websocket.DialOptions{
conn, res, err := websocket.Dial(ctx, reqURL.String(), &websocket.DialOptions{
HTTPClient: httpClient,
CompressionMode: websocket.CompressionDisabled,
})